mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 18:40:38 +00:00
Compare commits
64 Commits
releases/2
...
release-pr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7736b748d3 | ||
|
|
7bae78186b | ||
|
|
7e560dd00e | ||
|
|
684e924211 | ||
|
|
8ace9ea25f | ||
|
|
6a4f49b08b | ||
|
|
c6e89445e2 | ||
|
|
04f32b9526 | ||
|
|
6f2333f52b | ||
|
|
d447f49bc3 | ||
|
|
c5972389aa | ||
|
|
c4f5736d5a | ||
|
|
518f598e2d | ||
|
|
4b711caf5e | ||
|
|
2cf47b1477 | ||
|
|
7dcfcccf7c | ||
|
|
a26cc29d92 | ||
|
|
5f2f31e879 | ||
|
|
938b163b42 | ||
|
|
5cbf5b45ae | ||
|
|
af5c54ed14 | ||
|
|
523cf71721 | ||
|
|
c47f355ec1 | ||
|
|
4f67b0225b | ||
|
|
2f7cecaf6a | ||
|
|
589594c2e1 | ||
|
|
70fe007519 | ||
|
|
b224a5a377 | ||
|
|
a65d437930 | ||
|
|
9c23333cb3 | ||
|
|
66a99009ba | ||
|
|
5d4c57491f | ||
|
|
73935ea3a2 | ||
|
|
32e595d4dd | ||
|
|
b0d69acb07 | ||
|
|
98355a419a | ||
|
|
cfb03d6cf0 | ||
|
|
d81ef3f962 | ||
|
|
5d62c67e75 | ||
|
|
53d53d5b1e | ||
|
|
29fe6ea47a | ||
|
|
640327ccb3 | ||
|
|
7cf0f6b37e | ||
|
|
03c2c569be | ||
|
|
eff6d4538a | ||
|
|
5ef7782e9c | ||
|
|
73101db8c4 | ||
|
|
bccdfc6d39 | ||
|
|
99595813bb | ||
|
|
fe07b54758 | ||
|
|
a42d173e7b | ||
|
|
e07f689238 | ||
|
|
7831eddc88 | ||
|
|
943b1bc80c | ||
|
|
95a184e9b7 | ||
|
|
3fa17e9d17 | ||
|
|
55e0fd9789 | ||
|
|
2a88889f44 | ||
|
|
5bad8126dc | ||
|
|
27bc242085 | ||
|
|
192b49cc6d | ||
|
|
e1b60f3693 | ||
|
|
2804f5323b | ||
|
|
676adc6b32 |
10
.github/workflows/_build-and-test-locally.yml
vendored
10
.github/workflows/_build-and-test-locally.yml
vendored
@@ -257,7 +257,15 @@ jobs:
|
||||
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(remote_storage)' -E 'test(test_real_azure)'
|
||||
|
||||
- name: Install postgres binaries
|
||||
run: cp -a pg_install /tmp/neon/pg_install
|
||||
run: |
|
||||
# Use tar to copy files matching the pattern, preserving the paths in the destionation
|
||||
tar c \
|
||||
pg_install/v* \
|
||||
pg_install/build/*/src/test/regress/*.so \
|
||||
pg_install/build/*/src/test/regress/pg_regress \
|
||||
pg_install/build/*/src/test/isolation/isolationtester \
|
||||
pg_install/build/*/src/test/isolation/pg_isolation_regress \
|
||||
| tar x -C /tmp/neon
|
||||
|
||||
- name: Upload Neon artifact
|
||||
uses: ./.github/actions/upload
|
||||
|
||||
2
.github/workflows/cloud-regress.yml
vendored
2
.github/workflows/cloud-regress.yml
vendored
@@ -42,7 +42,7 @@ jobs:
|
||||
- name: Patch the test
|
||||
run: |
|
||||
cd "vendor/postgres-v${DEFAULT_PG_VERSION}"
|
||||
patch -p1 < "../../patches/cloud_regress_pg${DEFAULT_PG_VERSION}.patch"
|
||||
patch -p1 < "../../compute/patches/cloud_regress_pg${DEFAULT_PG_VERSION}.patch"
|
||||
|
||||
- name: Generate a random password
|
||||
id: pwgen
|
||||
|
||||
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -1321,7 +1321,6 @@ dependencies = [
|
||||
"clap",
|
||||
"comfy-table",
|
||||
"compute_api",
|
||||
"git-version",
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
"hyper 0.14.30",
|
||||
@@ -3578,7 +3577,6 @@ dependencies = [
|
||||
"anyhow",
|
||||
"camino",
|
||||
"clap",
|
||||
"git-version",
|
||||
"humantime",
|
||||
"pageserver",
|
||||
"pageserver_api",
|
||||
@@ -3617,7 +3615,6 @@ dependencies = [
|
||||
"enumset",
|
||||
"fail",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hex",
|
||||
"hex-literal",
|
||||
"humantime",
|
||||
@@ -3737,7 +3734,6 @@ dependencies = [
|
||||
"clap",
|
||||
"criterion",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hex-literal",
|
||||
"itertools 0.10.5",
|
||||
"once_cell",
|
||||
@@ -4307,7 +4303,6 @@ dependencies = [
|
||||
"fallible-iterator",
|
||||
"framed-websockets",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hashbrown 0.14.5",
|
||||
"hashlink",
|
||||
"hex",
|
||||
@@ -5139,7 +5134,6 @@ dependencies = [
|
||||
"desim",
|
||||
"fail",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hex",
|
||||
"humantime",
|
||||
"hyper 0.14.30",
|
||||
@@ -5702,7 +5696,6 @@ dependencies = [
|
||||
"futures",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"git-version",
|
||||
"humantime",
|
||||
"hyper 0.14.30",
|
||||
"metrics",
|
||||
@@ -5730,7 +5723,6 @@ dependencies = [
|
||||
"diesel_migrations",
|
||||
"fail",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hex",
|
||||
"humantime",
|
||||
"hyper 0.14.30",
|
||||
@@ -5783,7 +5775,6 @@ dependencies = [
|
||||
"either",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"git-version",
|
||||
"hex",
|
||||
"humantime",
|
||||
"itertools 0.10.5",
|
||||
@@ -6715,6 +6706,7 @@ dependencies = [
|
||||
"criterion",
|
||||
"fail",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hex",
|
||||
"hex-literal",
|
||||
"humantime",
|
||||
|
||||
@@ -195,7 +195,7 @@ metrics:
|
||||
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp. These
|
||||
-- temporary snapshot files are renamed to the actual snapshot files after they are
|
||||
-- completely built. We only WAL-log the completely built snapshot files.
|
||||
(SELECT COUNT(*) FROM pg_ls_logicalsnapdir() WHERE name LIKE '%.snap') AS num_logical_snapshot_files;
|
||||
(SELECT COUNT(*) FROM pg_ls_dir('pg_logical/snapshots') AS name WHERE name LIKE '%.snap') AS num_logical_snapshot_files;
|
||||
|
||||
# In all the below metrics, we cast LSNs to floats because Prometheus only supports floats.
|
||||
# It's probably fine because float64 can store integers from -2^53 to +2^53 exactly.
|
||||
@@ -244,4 +244,3 @@ metrics:
|
||||
SELECT slot_name,
|
||||
CASE WHEN wal_status = 'lost' THEN 1 ELSE 0 END AS wal_is_lost
|
||||
FROM pg_replication_slots;
|
||||
|
||||
|
||||
@@ -9,7 +9,6 @@ anyhow.workspace = true
|
||||
camino.workspace = true
|
||||
clap.workspace = true
|
||||
comfy-table.workspace = true
|
||||
git-version.workspace = true
|
||||
humantime.workspace = true
|
||||
nix.workspace = true
|
||||
once_cell.workspace = true
|
||||
|
||||
@@ -346,7 +346,14 @@ impl StorageController {
|
||||
let pg_log_path = pg_data_path.join("postgres.log");
|
||||
|
||||
if !tokio::fs::try_exists(&pg_data_path).await? {
|
||||
let initdb_args = ["-D", pg_data_path.as_ref(), "--username", &username()];
|
||||
let initdb_args = [
|
||||
"-D",
|
||||
pg_data_path.as_ref(),
|
||||
"--username",
|
||||
&username(),
|
||||
"--no-sync",
|
||||
"--no-instructions",
|
||||
];
|
||||
tracing::info!(
|
||||
"Initializing storage controller database with args: {:?}",
|
||||
initdb_args
|
||||
|
||||
@@ -4,8 +4,8 @@ use std::{str::FromStr, time::Duration};
|
||||
use clap::{Parser, Subcommand};
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse, ShardSchedulingPolicy,
|
||||
TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
|
||||
AvailabilityZone, NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse,
|
||||
ShardSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
|
||||
},
|
||||
models::{
|
||||
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
|
||||
@@ -339,7 +339,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
listen_pg_port,
|
||||
listen_http_addr,
|
||||
listen_http_port,
|
||||
availability_zone_id,
|
||||
availability_zone_id: AvailabilityZone(availability_zone_id),
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -2,8 +2,8 @@
|
||||
# Example docker compose configuration
|
||||
|
||||
The configuration in this directory is used for testing Neon docker images: it is
|
||||
not intended for deploying a usable system. To run a development environment where
|
||||
you can experiment with a minature Neon system, use `cargo neon` rather than container images.
|
||||
not intended for deploying a usable system. To run a development environment where
|
||||
you can experiment with a miniature Neon system, use `cargo neon` rather than container images.
|
||||
|
||||
This configuration does not start the storage controller, because the controller
|
||||
needs a way to reconfigure running computes, and no such thing exists in this setup.
|
||||
|
||||
343
docs/rfcs/038-independent-compute-release.md
Normal file
343
docs/rfcs/038-independent-compute-release.md
Normal file
@@ -0,0 +1,343 @@
|
||||
# Independent compute release
|
||||
|
||||
Created at: 2024-08-30. Author: Alexey Kondratov (@ololobus)
|
||||
|
||||
## Summary
|
||||
|
||||
This document proposes an approach to fully independent compute release flow. It attempts to
|
||||
cover the following features:
|
||||
|
||||
- Process is automated as much as possible to minimize human errors.
|
||||
- Compute<->storage protocol compatibility is ensured.
|
||||
- A transparent release history is available with an easy rollback strategy.
|
||||
- Although not in the scope of this document, there is a viable way to extend the proposed release
|
||||
flow to achieve the canary and/or blue-green deployment strategies.
|
||||
|
||||
## Motivation
|
||||
|
||||
Previously, the compute release was tightly coupled to the storage release. This meant that once
|
||||
some storage nodes got restarted with a newer version, all new compute starts using these nodes
|
||||
automatically got a new version. Thus, two releases happen in parallel, which increases the blast
|
||||
radius and makes ownership fuzzy.
|
||||
|
||||
Now, we practice a manual v0 independent compute release flow -- after getting a new compute release
|
||||
image and tag, we pin it region by region using Admin UI. It's better, but it still has its own flaws:
|
||||
|
||||
1. It's a simple but fairly manual process, as you need to click through a few pages.
|
||||
2. It's prone to human errors, e.g., you could mistype or copy the wrong compute tag.
|
||||
3. We now require an additional approval in the Admin UI, which partially solves the 2.,
|
||||
but also makes the whole process pretty annoying, as you constantly need to go back
|
||||
and forth between two people.
|
||||
|
||||
## Non-goals
|
||||
|
||||
It's not the goal of this document to propose a design for some general-purpose release tool like Helm.
|
||||
The document considers how the current compute fleet is orchestrated at Neon. Even if we later
|
||||
decide to split the control plane further (e.g., introduce a separate compute controller), the proposed
|
||||
release process shouldn't change much, i.e., the releases table and API will reside in
|
||||
one of the parts.
|
||||
|
||||
Achieving the canary and/or blue-green deploy strategies is out of the scope of this document. They
|
||||
were kept in mind, though, so it's expected that the proposed approach will lay down the foundation
|
||||
for implementing them in future iterations.
|
||||
|
||||
## Impacted components
|
||||
|
||||
Compute, control plane, CI, observability (some Grafana dashboards may require changes).
|
||||
|
||||
## Prior art
|
||||
|
||||
One of the very close examples is how Helm tracks [releases history](https://helm.sh/docs/helm/helm_history/).
|
||||
|
||||
In the code:
|
||||
|
||||
- [Release](https://github.com/helm/helm/blob/2b30cf4b61d587d3f7594102bb202b787b9918db/pkg/release/release.go#L20-L43)
|
||||
- [Release info](https://github.com/helm/helm/blob/2b30cf4b61d587d3f7594102bb202b787b9918db/pkg/release/info.go#L24-L40)
|
||||
- [Release status](https://github.com/helm/helm/blob/2b30cf4b61d587d3f7594102bb202b787b9918db/pkg/release/status.go#L18-L42)
|
||||
|
||||
TL;DR it has several important attributes:
|
||||
|
||||
- Revision -- unique release ID/primary key. It is not the same as the application version,
|
||||
because the same version can be deployed several times, e.g., after a newer version rollback.
|
||||
- App version -- version of the application chart/code.
|
||||
- Config -- set of overrides to the default config of the application.
|
||||
- Status -- current status of the release in the history.
|
||||
- Timestamps -- tracks when a release was created and deployed.
|
||||
|
||||
## Proposed implementation
|
||||
|
||||
### Separate release branch
|
||||
|
||||
We will use a separate release branch, `release-compute`, to have a clean history for releases and commits.
|
||||
In order to avoid confusion with storage releases, we will use a different prefix for compute [git release
|
||||
tags](https://github.com/neondatabase/neon/releases) -- `release-compute-XXXX`. We will use the same tag for
|
||||
Docker images as well. The `neondatabase/compute-node-v16:release-compute-XXXX` looks longer and a bit redundant,
|
||||
but it's better to have image and git tags in sync.
|
||||
|
||||
Currently, control plane relies on the numeric compute and storage release versions to decide on compute->storage
|
||||
compatibility. Once we implement this proposal, we should drop this code as release numbers will be completely
|
||||
independent. The only constraint we want is that it must monotonically increase within the same release branch.
|
||||
|
||||
### Compute config/settings manifest
|
||||
|
||||
We will create a new sub-directory `compute` and file `compute/manifest.yaml` with a structure:
|
||||
|
||||
```yaml
|
||||
pg_settings:
|
||||
# Common settings for primaries and secondaries of all versions.
|
||||
common:
|
||||
wal_log_hints: "off"
|
||||
max_wal_size: "1024"
|
||||
|
||||
per_version:
|
||||
14:
|
||||
# Common settings for both replica and primary of version PG 14
|
||||
common:
|
||||
shared_preload_libraries: "neon,pg_stat_statements,extension_x"
|
||||
15:
|
||||
common:
|
||||
shared_preload_libraries: "neon,pg_stat_statements,extension_x"
|
||||
# Settings that should be applied only to
|
||||
replica:
|
||||
# Available only starting Postgres 15th
|
||||
recovery_prefetch: "off"
|
||||
# ...
|
||||
17:
|
||||
common:
|
||||
# For example, if third-party `extension_x` is not yet available for PG 17
|
||||
shared_preload_libraries: "neon,pg_stat_statements"
|
||||
replica:
|
||||
recovery_prefetch: "off"
|
||||
```
|
||||
|
||||
**N.B.** Setting value should be a string with `on|off` for booleans and a number (as a string)
|
||||
without units for all numeric settings. That's how the control plane currently operates.
|
||||
|
||||
The priority of settings will be (a higher number is a higher priority):
|
||||
|
||||
1. Any static and hard-coded settings in the control plane
|
||||
2. `pg_settings->common`
|
||||
3. Per-version `common`
|
||||
4. Per-version `replica`
|
||||
5. Any per-user/project/endpoint overrides in the control plane
|
||||
6. Any dynamic setting calculated based on the compute size
|
||||
|
||||
**N.B.** For simplicity, we do not do any custom logic for `shared_preload_libraries`, so it's completely
|
||||
overridden if specified on some level. Make sure that you include all necessary extensions in it when you
|
||||
do any overrides.
|
||||
|
||||
**N.B.** There is a tricky question about what to do with custom compute image pinning we sometimes
|
||||
do for particular projects and customers. That's usually some ad-hoc work and images are based on
|
||||
the latest compute image, so it's relatively safe to assume that we could use settings from the latest compute
|
||||
release. If for some reason that's not true, and further overrides are needed, it's also possible to do
|
||||
on the project level together with pinning the image, so it's on-call/engineer/support responsibility to
|
||||
ensure that compute starts with the specified custom image. The only real risk is that compute image will get
|
||||
stale and settings from new releases will drift away, so eventually it will get something incompatible,
|
||||
but i) this is some operational issue, as we do not want stale images anyway, and ii) base settings
|
||||
receive something really new so rarely that the chance of this happening is very low. If we want to solve it completely,
|
||||
then together with pinning the image we could also pin the matching release revision in the control plane.
|
||||
|
||||
The compute team will own the content of `compute/manifest.yaml`.
|
||||
|
||||
### Control plane: releases table
|
||||
|
||||
In order to store information about releases, the control plane will use a table `compute_releases` with the following
|
||||
schema:
|
||||
|
||||
```sql
|
||||
CREATE TABLE compute_releases (
|
||||
-- Unique release ID
|
||||
-- N.B. Revision won't by synchronized across all regions, because all control planes are technically independent
|
||||
-- services. We have the same situation with Helm releases as well because they could be deployed and rolled back
|
||||
-- independently in different clusters.
|
||||
revision BIGSERIAL PRIMARY KEY,
|
||||
-- Numeric version of the compute image, e.g. 9057
|
||||
version BIGINT NOT NULL,
|
||||
-- Compute image tag, e.g. `release-9057`
|
||||
tag TEXT NOT NULL,
|
||||
-- Current release status. Currently, it will be a simple enum
|
||||
-- * `deployed` -- release is deployed and used for new compute starts.
|
||||
-- Exactly one release can have this status at a time.
|
||||
-- * `superseded` -- release has been replaced by a newer one.
|
||||
-- But we can always extend it in the future when we need more statuses
|
||||
-- for more complex deployment strategies.
|
||||
status TEXT NOT NULL,
|
||||
-- Any additional metadata for compute in the corresponding release
|
||||
manifest JSONB NOT NULL,
|
||||
-- Timestamp when release record was created in the control plane database
|
||||
created_at TIMESTAMP NOT NULL DEFAULT now(),
|
||||
-- Timestamp when release deployment was finished
|
||||
deployed_at TIMESTAMP
|
||||
);
|
||||
```
|
||||
|
||||
We keep track of the old releases not only for the sake of audit, but also because we usually have ~30% of
|
||||
old computes started using the image from one of the previous releases. Yet, when users want to reconfigure
|
||||
them without restarting, the control plane needs to know what settings are applicable to them, so we also need
|
||||
information about the previous releases that are readily available. There could be some other auxiliary info
|
||||
needed as well: supported extensions, compute flags, etc.
|
||||
|
||||
**N.B.** Here, we can end up in an ambiguous situation when the same compute image is deployed twice, e.g.,
|
||||
it was deployed once, then rolled back, and then deployed again, potentially with a different manifest. Yet,
|
||||
we could've started some computes with the first deployment and some with the second. Thus, when we need to
|
||||
look up the manifest for the compute by its image tag, we will see two records in the table with the same tag,
|
||||
but different revision numbers. We can assume that this could happen only in case of rollbacks, so we
|
||||
can just take the latest revision for the given tag.
|
||||
|
||||
### Control plane: management API
|
||||
|
||||
The control plane will implement new API methods to manage releases:
|
||||
|
||||
1. `POST /management/api/v2/compute_releases` to create a new release. With payload
|
||||
|
||||
```json
|
||||
{
|
||||
"version": 9057,
|
||||
"tag": "release-9057",
|
||||
"manifest": {}
|
||||
}
|
||||
```
|
||||
|
||||
and response
|
||||
|
||||
```json
|
||||
{
|
||||
"revision": 53,
|
||||
"version": 9057,
|
||||
"tag": "release-9057",
|
||||
"status": "deployed",
|
||||
"manifest": {},
|
||||
"created_at": "2024-08-15T15:52:01.0000Z",
|
||||
"deployed_at": "2024-08-15T15:52:01.0000Z",
|
||||
}
|
||||
```
|
||||
|
||||
Here, we can actually mix-in custom (remote) extensions metadata into the `manifest`, so that the control plane
|
||||
will get information about all available extensions not bundled into compute image. The corresponding
|
||||
workflow in `neondatabase/build-custom-extensions` should produce it as an artifact and make
|
||||
it accessible to the workflow in the `neondatabase/infra`. See the complete release flow below. Doing that,
|
||||
we put a constraint that new custom extension requires new compute release, which is good for the safety,
|
||||
but is not exactly what we want operational-wise (we want to be able to deploy new extensions without new
|
||||
images). Yet, it can be solved incrementally: v0 -- do not do anything with extensions at all;
|
||||
v1 -- put them into the same manifest; v2 -- make them separate entities with their own lifecycle.
|
||||
|
||||
**N.B.** This method is intended to be used in CI workflows, and CI/network can be flaky. It's reasonable
|
||||
to assume that we could retry the request several times, even though it's already succeeded. Although it's
|
||||
not a big deal to create several identical releases one-by-one, it's better to avoid it, so the control plane
|
||||
should check if the latest release is identical and just return `304 Not Modified` in this case.
|
||||
|
||||
2. `POST /management/api/v2/compute_releases/rollback` to rollback to any previously deployed release. With payload
|
||||
including the revision of the release to rollback to:
|
||||
|
||||
```json
|
||||
{
|
||||
"revision": 52
|
||||
}
|
||||
```
|
||||
|
||||
Rollback marks the current release as `superseded` and creates a new release with all the same data as the
|
||||
requested revision, but with a new revision number.
|
||||
|
||||
This rollback API is not strictly needed, as we can just use `infra` repo workflow to deploy any
|
||||
available tag. It's still nice to have for on-call and any urgent matters, for example, if we need
|
||||
to rollback and GitHub is down. It's much easier to specify only the revision number vs. crafting
|
||||
all the necessary data for the new release payload.
|
||||
|
||||
### Compute->storage compatibility tests
|
||||
|
||||
In order to safely release new compute versions independently from storage, we need to ensure that the currently
|
||||
deployed storage is compatible with the new compute version. Currently, we maintain backward compatibility
|
||||
in storage, but newer computes may require a newer storage version.
|
||||
|
||||
Remote end-to-end (e2e) tests [already accept](https://github.com/neondatabase/cloud/blob/e3468d433e0d73d02b7d7e738d027f509b522408/.github/workflows/testing.yml#L43-L48)
|
||||
`storage_image_tag` and `compute_image_tag` as separate inputs. That means that we could reuse e2e tests to ensure
|
||||
compatibility between storage and compute:
|
||||
|
||||
1. Pick the latest storage release tag and use it as `storage_image_tag`.
|
||||
2. Pick a new compute tag built in the current compute release PR and use it as `compute_image_tag`.
|
||||
Here, we should use a temporary ECR image tag, because the final tag will be known only after the release PR is merged.
|
||||
3. Trigger e2e tests as usual.
|
||||
|
||||
### Release flow
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
|
||||
actor oncall as Compute on-call person
|
||||
participant neon as neondatabase/neon
|
||||
|
||||
box private
|
||||
participant cloud as neondatabase/cloud
|
||||
participant exts as neondatabase/build-custom-extensions
|
||||
participant infra as neondatabase/infra
|
||||
end
|
||||
|
||||
box cloud
|
||||
participant preprod as Pre-prod control plane
|
||||
participant prod as Production control plane
|
||||
participant k8s as Compute k8s
|
||||
end
|
||||
|
||||
oncall ->> neon: Open release PR into release-compute
|
||||
|
||||
activate neon
|
||||
neon ->> cloud: CI: trigger e2e compatibility tests
|
||||
activate cloud
|
||||
cloud -->> neon: CI: e2e tests pass
|
||||
deactivate cloud
|
||||
neon ->> neon: CI: pass PR checks, get approvals
|
||||
deactivate neon
|
||||
|
||||
oncall ->> neon: Merge release PR into release-compute
|
||||
|
||||
activate neon
|
||||
neon ->> neon: CI: pass checks, build and push images
|
||||
neon ->> exts: CI: trigger extensions build
|
||||
activate exts
|
||||
exts -->> neon: CI: extensions are ready
|
||||
deactivate exts
|
||||
neon ->> neon: CI: create release tag
|
||||
neon ->> infra: Trigger release workflow using the produced tag
|
||||
deactivate neon
|
||||
|
||||
activate infra
|
||||
infra ->> infra: CI: pass checks
|
||||
infra ->> preprod: Release new compute image to pre-prod automatically <br/> POST /management/api/v2/compute_releases
|
||||
activate preprod
|
||||
preprod -->> infra: 200 OK
|
||||
deactivate preprod
|
||||
|
||||
infra ->> infra: CI: wait for per-region production deploy approvals
|
||||
oncall ->> infra: CI: approve deploys region by region
|
||||
infra ->> k8s: Prewarm new compute image
|
||||
infra ->> prod: POST /management/api/v2/compute_releases
|
||||
activate prod
|
||||
prod -->> infra: 200 OK
|
||||
deactivate prod
|
||||
deactivate infra
|
||||
```
|
||||
|
||||
## Further work
|
||||
|
||||
As briefly mentioned in other sections, eventually, we would like to use more complex deployment strategies.
|
||||
For example, we can pass a fraction of the total compute starts that should use the new release. Then we can
|
||||
mark the release as `partial` or `canary` and monitor its performance. If everything is fine, we can promote it
|
||||
to `deployed` status. If not, we can roll back to the previous one.
|
||||
|
||||
## Alternatives
|
||||
|
||||
In theory, we can try using Helm as-is:
|
||||
|
||||
1. Write a compute Helm chart. That will actually have only some config map, which the control plane can access and read.
|
||||
N.B. We could reuse the control plane chart as well, but then it's not a fully independent release again and even more fuzzy.
|
||||
2. The control plane will read it and start using the new compute version for new starts.
|
||||
|
||||
Drawbacks:
|
||||
|
||||
1. Helm releases work best if the workload is controlled by the Helm chart itself. Then you can have different
|
||||
deployment strategies like rolling update or canary or blue/green deployments. At Neon, the compute starts are controlled
|
||||
by control plane, so it makes it much more tricky.
|
||||
2. Releases visibility will suffer, i.e. instead of a nice table in the control plane and Admin UI, we would need to use
|
||||
`helm` cli and/or K8s UIs like K8sLens.
|
||||
3. We do not restart all computes shortly after the new version release. This means that for some features and compatibility
|
||||
purpose (see above) control plane may need some auxiliary info from the previous releases.
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fmt::Display;
|
||||
use std::str::FromStr;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
@@ -57,7 +58,7 @@ pub struct NodeRegisterRequest {
|
||||
pub listen_http_addr: String,
|
||||
pub listen_http_port: u16,
|
||||
|
||||
pub availability_zone_id: String,
|
||||
pub availability_zone_id: AvailabilityZone,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
@@ -74,10 +75,19 @@ pub struct TenantPolicyRequest {
|
||||
pub scheduling: Option<ShardSchedulingPolicy>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
|
||||
pub struct AvailabilityZone(pub String);
|
||||
|
||||
impl Display for AvailabilityZone {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ShardsPreferredAzsRequest {
|
||||
#[serde(flatten)]
|
||||
pub preferred_az_ids: HashMap<TenantShardId, String>,
|
||||
pub preferred_az_ids: HashMap<TenantShardId, AvailabilityZone>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
|
||||
@@ -37,14 +37,11 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
/// ```mermaid
|
||||
/// stateDiagram-v2
|
||||
///
|
||||
/// [*] --> Loading: spawn_load()
|
||||
/// [*] --> Attaching: spawn_attach()
|
||||
///
|
||||
/// Loading --> Activating: activate()
|
||||
/// Attaching --> Activating: activate()
|
||||
/// Activating --> Active: infallible
|
||||
///
|
||||
/// Loading --> Broken: load() failure
|
||||
/// Attaching --> Broken: attach() failure
|
||||
///
|
||||
/// Active --> Stopping: set_stopping(), part of shutdown & detach
|
||||
@@ -68,10 +65,6 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
)]
|
||||
#[serde(tag = "slug", content = "data")]
|
||||
pub enum TenantState {
|
||||
/// This tenant is being loaded from local disk.
|
||||
///
|
||||
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
|
||||
Loading,
|
||||
/// This tenant is being attached to the pageserver.
|
||||
///
|
||||
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
|
||||
@@ -121,8 +114,6 @@ impl TenantState {
|
||||
// But, our attach task might still be fetching the remote timelines, etc.
|
||||
// So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
|
||||
Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
|
||||
// tenant mgr startup distinguishes attaching from loading via marker file.
|
||||
Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
|
||||
// We only reach Active after successful load / attach.
|
||||
// So, call atttachment status Attached.
|
||||
Self::Active => Attached,
|
||||
@@ -191,10 +182,11 @@ impl LsnLease {
|
||||
}
|
||||
|
||||
/// The only [`TenantState`] variants we could be `TenantState::Activating` from.
|
||||
///
|
||||
/// XXX: We used to have more variants here, but now it's just one, which makes this rather
|
||||
/// useless. Remove, once we've checked that there's no client code left that looks at this.
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub enum ActivatingFrom {
|
||||
/// Arrived to [`TenantState::Activating`] from [`TenantState::Loading`]
|
||||
Loading,
|
||||
/// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`]
|
||||
Attaching,
|
||||
}
|
||||
@@ -1562,11 +1554,8 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn tenantstatus_activating_serde() {
|
||||
let states = [
|
||||
TenantState::Activating(ActivatingFrom::Loading),
|
||||
TenantState::Activating(ActivatingFrom::Attaching),
|
||||
];
|
||||
let expected = "[{\"slug\":\"Activating\",\"data\":\"Loading\"},{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
|
||||
let states = [TenantState::Activating(ActivatingFrom::Attaching)];
|
||||
let expected = "[{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
|
||||
|
||||
let actual = serde_json::to_string(&states).unwrap();
|
||||
|
||||
@@ -1581,13 +1570,7 @@ mod tests {
|
||||
fn tenantstatus_activating_strum() {
|
||||
// tests added, because we use these for metrics
|
||||
let examples = [
|
||||
(line!(), TenantState::Loading, "Loading"),
|
||||
(line!(), TenantState::Attaching, "Attaching"),
|
||||
(
|
||||
line!(),
|
||||
TenantState::Activating(ActivatingFrom::Loading),
|
||||
"Activating",
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
TenantState::Activating(ActivatingFrom::Attaching),
|
||||
|
||||
@@ -19,6 +19,7 @@ bincode.workspace = true
|
||||
bytes.workspace = true
|
||||
camino.workspace = true
|
||||
chrono.workspace = true
|
||||
git-version.workspace = true
|
||||
hex = { workspace = true, features = ["serde"] }
|
||||
humantime.workspace = true
|
||||
hyper = { workspace = true, features = ["full"] }
|
||||
|
||||
@@ -92,6 +92,10 @@ pub mod toml_edit_ext;
|
||||
|
||||
pub mod circuit_breaker;
|
||||
|
||||
// Re-export used in macro. Avoids adding git-version as dep in target crates.
|
||||
#[doc(hidden)]
|
||||
pub use git_version;
|
||||
|
||||
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
|
||||
///
|
||||
/// we have several cases:
|
||||
@@ -131,7 +135,7 @@ macro_rules! project_git_version {
|
||||
($const_identifier:ident) => {
|
||||
// this should try GIT_VERSION first only then git_version::git_version!
|
||||
const $const_identifier: &::core::primitive::str = {
|
||||
const __COMMIT_FROM_GIT: &::core::primitive::str = git_version::git_version! {
|
||||
const __COMMIT_FROM_GIT: &::core::primitive::str = $crate::git_version::git_version! {
|
||||
prefix = "",
|
||||
fallback = "unknown",
|
||||
args = ["--abbrev=40", "--always", "--dirty=-modified"] // always use full sha
|
||||
|
||||
@@ -27,7 +27,6 @@ crc32c.workspace = true
|
||||
either.workspace = true
|
||||
fail.workspace = true
|
||||
futures.workspace = true
|
||||
git-version.workspace = true
|
||||
hex.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
|
||||
@@ -12,7 +12,6 @@ anyhow.workspace = true
|
||||
async-stream.workspace = true
|
||||
clap = { workspace = true, features = ["string"] }
|
||||
futures.workspace = true
|
||||
git-version.workspace = true
|
||||
itertools.workspace = true
|
||||
once_cell.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
|
||||
@@ -10,7 +10,6 @@ license.workspace = true
|
||||
anyhow.workspace = true
|
||||
camino.workspace = true
|
||||
clap = { workspace = true, features = ["string"] }
|
||||
git-version.workspace = true
|
||||
humantime.workspace = true
|
||||
pageserver = { path = ".." }
|
||||
pageserver_api.workspace = true
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::collections::HashMap;
|
||||
|
||||
use futures::Future;
|
||||
use pageserver_api::{
|
||||
controller_api::NodeRegisterRequest,
|
||||
controller_api::{AvailabilityZone, NodeRegisterRequest},
|
||||
shard::TenantShardId,
|
||||
upcall_api::{
|
||||
ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
|
||||
@@ -148,10 +148,10 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
|
||||
.and_then(|jv| jv.as_str().map(|str| str.to_owned()));
|
||||
|
||||
match az_id_from_metadata {
|
||||
Some(az_id) => Some(az_id),
|
||||
Some(az_id) => Some(AvailabilityZone(az_id)),
|
||||
None => {
|
||||
tracing::warn!("metadata.json does not contain an 'availability_zone_id' field");
|
||||
conf.availability_zone.clone()
|
||||
conf.availability_zone.clone().map(AvailabilityZone)
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -589,6 +589,10 @@ async fn timeline_create_handler(
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
HttpErrorBody::from_msg(e.to_string()),
|
||||
),
|
||||
Err(e @ tenant::CreateTimelineError::AncestorArchived) => json_response(
|
||||
StatusCode::NOT_ACCEPTABLE,
|
||||
HttpErrorBody::from_msg(e.to_string()),
|
||||
),
|
||||
Err(tenant::CreateTimelineError::ShuttingDown) => json_response(
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
HttpErrorBody::from_msg("tenant shutting down".to_string()),
|
||||
|
||||
@@ -3208,45 +3208,38 @@ pub(crate) mod tenant_throttling {
|
||||
|
||||
impl TimelineGet {
|
||||
pub(crate) fn new(tenant_shard_id: &TenantShardId) -> Self {
|
||||
let per_tenant_label_values = &[
|
||||
KIND,
|
||||
&tenant_shard_id.tenant_id.to_string(),
|
||||
&tenant_shard_id.shard_slug().to_string(),
|
||||
];
|
||||
TimelineGet {
|
||||
count_accounted_start: {
|
||||
GlobalAndPerTenantIntCounter {
|
||||
global: COUNT_ACCOUNTED_START.with_label_values(&[KIND]),
|
||||
per_tenant: COUNT_ACCOUNTED_START_PER_TENANT.with_label_values(&[
|
||||
KIND,
|
||||
&tenant_shard_id.tenant_id.to_string(),
|
||||
&tenant_shard_id.shard_slug().to_string(),
|
||||
]),
|
||||
per_tenant: COUNT_ACCOUNTED_START_PER_TENANT
|
||||
.with_label_values(per_tenant_label_values),
|
||||
}
|
||||
},
|
||||
count_accounted_finish: {
|
||||
GlobalAndPerTenantIntCounter {
|
||||
global: COUNT_ACCOUNTED_FINISH.with_label_values(&[KIND]),
|
||||
per_tenant: COUNT_ACCOUNTED_FINISH_PER_TENANT.with_label_values(&[
|
||||
KIND,
|
||||
&tenant_shard_id.tenant_id.to_string(),
|
||||
&tenant_shard_id.shard_slug().to_string(),
|
||||
]),
|
||||
per_tenant: COUNT_ACCOUNTED_FINISH_PER_TENANT
|
||||
.with_label_values(per_tenant_label_values),
|
||||
}
|
||||
},
|
||||
wait_time: {
|
||||
GlobalAndPerTenantIntCounter {
|
||||
global: WAIT_USECS.with_label_values(&[KIND]),
|
||||
per_tenant: WAIT_USECS_PER_TENANT.with_label_values(&[
|
||||
KIND,
|
||||
&tenant_shard_id.tenant_id.to_string(),
|
||||
&tenant_shard_id.shard_slug().to_string(),
|
||||
]),
|
||||
per_tenant: WAIT_USECS_PER_TENANT
|
||||
.with_label_values(per_tenant_label_values),
|
||||
}
|
||||
},
|
||||
count_throttled: {
|
||||
GlobalAndPerTenantIntCounter {
|
||||
global: WAIT_COUNT.with_label_values(&[KIND]),
|
||||
per_tenant: WAIT_COUNT_PER_TENANT.with_label_values(&[
|
||||
KIND,
|
||||
&tenant_shard_id.tenant_id.to_string(),
|
||||
&tenant_shard_id.shard_slug().to_string(),
|
||||
]),
|
||||
per_tenant: WAIT_COUNT_PER_TENANT
|
||||
.with_label_values(per_tenant_label_values),
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
@@ -563,6 +563,8 @@ pub enum CreateTimelineError {
|
||||
AncestorLsn(anyhow::Error),
|
||||
#[error("ancestor timeline is not active")]
|
||||
AncestorNotActive,
|
||||
#[error("ancestor timeline is archived")]
|
||||
AncestorArchived,
|
||||
#[error("tenant shutting down")]
|
||||
ShuttingDown,
|
||||
#[error(transparent)]
|
||||
@@ -1698,6 +1700,11 @@ impl Tenant {
|
||||
return Err(CreateTimelineError::AncestorNotActive);
|
||||
}
|
||||
|
||||
if ancestor_timeline.is_archived() == Some(true) {
|
||||
info!("tried to branch archived timeline");
|
||||
return Err(CreateTimelineError::AncestorArchived);
|
||||
}
|
||||
|
||||
if let Some(lsn) = ancestor_start_lsn.as_mut() {
|
||||
*lsn = lsn.align();
|
||||
|
||||
@@ -1968,9 +1975,6 @@ impl Tenant {
|
||||
TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => {
|
||||
panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
|
||||
}
|
||||
TenantState::Loading => {
|
||||
*current_state = TenantState::Activating(ActivatingFrom::Loading);
|
||||
}
|
||||
TenantState::Attaching => {
|
||||
*current_state = TenantState::Activating(ActivatingFrom::Attaching);
|
||||
}
|
||||
@@ -2151,7 +2155,7 @@ impl Tenant {
|
||||
async fn set_stopping(
|
||||
&self,
|
||||
progress: completion::Barrier,
|
||||
allow_transition_from_loading: bool,
|
||||
_allow_transition_from_loading: bool,
|
||||
allow_transition_from_attaching: bool,
|
||||
) -> Result<(), SetStoppingError> {
|
||||
let mut rx = self.state.subscribe();
|
||||
@@ -2166,7 +2170,6 @@ impl Tenant {
|
||||
);
|
||||
false
|
||||
}
|
||||
TenantState::Loading => allow_transition_from_loading,
|
||||
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
|
||||
})
|
||||
.await
|
||||
@@ -2185,13 +2188,6 @@ impl Tenant {
|
||||
*current_state = TenantState::Stopping { progress };
|
||||
true
|
||||
}
|
||||
TenantState::Loading => {
|
||||
if !allow_transition_from_loading {
|
||||
unreachable!("3we ensured above that we're done with activation, and, there is no re-activation")
|
||||
};
|
||||
*current_state = TenantState::Stopping { progress };
|
||||
true
|
||||
}
|
||||
TenantState::Active => {
|
||||
// FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
|
||||
// are created after the transition to Stopping. That's harmless, as the Timelines
|
||||
@@ -2247,7 +2243,7 @@ impl Tenant {
|
||||
// The load & attach routines own the tenant state until it has reached `Active`.
|
||||
// So, wait until it's done.
|
||||
rx.wait_for(|state| match state {
|
||||
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
|
||||
TenantState::Activating(_) | TenantState::Attaching => {
|
||||
info!(
|
||||
"waiting for {} to turn Active|Broken|Stopping",
|
||||
<&'static str>::from(state)
|
||||
@@ -2267,7 +2263,7 @@ impl Tenant {
|
||||
let reason = reason.to_string();
|
||||
self.state.send_modify(|current_state| {
|
||||
match *current_state {
|
||||
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
|
||||
TenantState::Activating(_) | TenantState::Attaching => {
|
||||
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
|
||||
}
|
||||
TenantState::Active => {
|
||||
@@ -2311,7 +2307,7 @@ impl Tenant {
|
||||
loop {
|
||||
let current_state = receiver.borrow_and_update().clone();
|
||||
match current_state {
|
||||
TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => {
|
||||
TenantState::Attaching | TenantState::Activating(_) => {
|
||||
// in these states, there's a chance that we can reach ::Active
|
||||
self.activate_now();
|
||||
match timeout_cancellable(timeout, &self.cancel, receiver.changed()).await {
|
||||
@@ -4144,7 +4140,7 @@ pub(crate) mod harness {
|
||||
let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
|
||||
|
||||
let tenant = Arc::new(Tenant::new(
|
||||
TenantState::Loading,
|
||||
TenantState::Attaching,
|
||||
self.conf,
|
||||
AttachedTenantConf::try_from(LocationConf::attached_single(
|
||||
TenantConfOpt::from(self.tenant_conf.clone()),
|
||||
|
||||
@@ -5,6 +5,7 @@ use itertools::Itertools;
|
||||
use super::storage_layer::LayerName;
|
||||
|
||||
/// Checks whether a layer map is valid (i.e., is a valid result of the current compaction algorithm if nothing goes wrong).
|
||||
///
|
||||
/// The function checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example,
|
||||
///
|
||||
/// ```plain
|
||||
|
||||
@@ -39,7 +39,7 @@ use crate::tenant::disk_btree::{
|
||||
use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
||||
BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
||||
VectoredReadCoalesceMode, VectoredReadPlanner,
|
||||
};
|
||||
use crate::tenant::PageReconstructError;
|
||||
@@ -1021,13 +1021,30 @@ impl DeltaLayerInner {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let view = BufView::new_slice(&blobs_buf.buf);
|
||||
for meta in blobs_buf.blobs.iter().rev() {
|
||||
if Some(meta.meta.key) == ignore_key_with_err {
|
||||
continue;
|
||||
}
|
||||
let blob_read = meta.read(&view).await;
|
||||
let blob_read = match blob_read {
|
||||
Ok(buf) => buf,
|
||||
Err(e) => {
|
||||
reconstruct_state.on_key_error(
|
||||
meta.meta.key,
|
||||
PageReconstructError::Other(anyhow!(e).context(format!(
|
||||
"Failed to decompress blob from virtual file {}",
|
||||
self.file.path,
|
||||
))),
|
||||
);
|
||||
|
||||
ignore_key_with_err = Some(meta.meta.key);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let value = Value::des(&blob_read);
|
||||
|
||||
let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
|
||||
let value = match value {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
@@ -1243,21 +1260,21 @@ impl DeltaLayerInner {
|
||||
buf.reserve(read.size());
|
||||
let res = reader.read_blobs(&read, buf, ctx).await?;
|
||||
|
||||
let view = BufView::new_slice(&res.buf);
|
||||
|
||||
for blob in res.blobs {
|
||||
let key = blob.meta.key;
|
||||
let lsn = blob.meta.lsn;
|
||||
let data = &res.buf[blob.start..blob.end];
|
||||
|
||||
let data = blob.read(&view).await?;
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
Value::des(data)
|
||||
Value::des(&data)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"blob failed to deserialize for {}@{}, {}..{}: {:?}",
|
||||
blob.meta.key,
|
||||
blob.meta.lsn,
|
||||
blob.start,
|
||||
blob.end,
|
||||
utils::Hex(data)
|
||||
"blob failed to deserialize for {}: {:?}",
|
||||
blob,
|
||||
utils::Hex(&data)
|
||||
)
|
||||
})
|
||||
.unwrap();
|
||||
@@ -1265,15 +1282,15 @@ impl DeltaLayerInner {
|
||||
// is it an image or will_init walrecord?
|
||||
// FIXME: this could be handled by threading the BlobRef to the
|
||||
// VectoredReadBuilder
|
||||
let will_init = crate::repository::ValueBytes::will_init(data)
|
||||
let will_init = crate::repository::ValueBytes::will_init(&data)
|
||||
.inspect_err(|_e| {
|
||||
#[cfg(feature = "testing")]
|
||||
tracing::error!(data=?utils::Hex(data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
|
||||
tracing::error!(data=?utils::Hex(&data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
|
||||
})
|
||||
.unwrap_or(false);
|
||||
|
||||
per_blob_copy.clear();
|
||||
per_blob_copy.extend_from_slice(data);
|
||||
per_blob_copy.extend_from_slice(&data);
|
||||
|
||||
let (tmp, res) = writer
|
||||
.put_value_bytes(
|
||||
@@ -1538,8 +1555,11 @@ impl<'a> DeltaLayerIterator<'a> {
|
||||
.read_blobs(&plan, buf, self.ctx)
|
||||
.await?;
|
||||
let frozen_buf = blobs_buf.buf.freeze();
|
||||
let view = BufView::new_bytes(frozen_buf);
|
||||
for meta in blobs_buf.blobs.iter() {
|
||||
let value = Value::des(&frozen_buf[meta.start..meta.end])?;
|
||||
let blob_read = meta.read(&view).await?;
|
||||
let value = Value::des(&blob_read)?;
|
||||
|
||||
next_batch.push_back((meta.meta.key, meta.meta.lsn, value));
|
||||
}
|
||||
self.key_values_batch = next_batch;
|
||||
@@ -1916,9 +1936,13 @@ pub(crate) mod test {
|
||||
let blobs_buf = vectored_blob_reader
|
||||
.read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx)
|
||||
.await?;
|
||||
let view = BufView::new_slice(&blobs_buf.buf);
|
||||
for meta in blobs_buf.blobs.iter() {
|
||||
let value = &blobs_buf.buf[meta.start..meta.end];
|
||||
assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]);
|
||||
let value = meta.read(&view).await?;
|
||||
assert_eq!(
|
||||
&value[..],
|
||||
&entries_meta.index[&(meta.meta.key, meta.meta.lsn)]
|
||||
);
|
||||
}
|
||||
|
||||
buf = Some(blobs_buf.buf);
|
||||
|
||||
@@ -36,7 +36,8 @@ use crate::tenant::disk_btree::{
|
||||
};
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
|
||||
BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
||||
VectoredReadPlanner,
|
||||
};
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
@@ -547,15 +548,15 @@ impl ImageLayerInner {
|
||||
|
||||
let buf = BytesMut::with_capacity(buf_size);
|
||||
let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
|
||||
|
||||
let frozen_buf = blobs_buf.buf.freeze();
|
||||
let view = BufView::new_bytes(frozen_buf);
|
||||
|
||||
for meta in blobs_buf.blobs.iter() {
|
||||
let img_buf = frozen_buf.slice(meta.start..meta.end);
|
||||
let img_buf = meta.read(&view).await?;
|
||||
|
||||
key_count += 1;
|
||||
writer
|
||||
.put_image(meta.meta.key, img_buf, ctx)
|
||||
.put_image(meta.meta.key, img_buf.into_bytes(), ctx)
|
||||
.await
|
||||
.context(format!("Storing key {}", meta.meta.key))?;
|
||||
}
|
||||
@@ -602,13 +603,28 @@ impl ImageLayerInner {
|
||||
match res {
|
||||
Ok(blobs_buf) => {
|
||||
let frozen_buf = blobs_buf.buf.freeze();
|
||||
|
||||
let view = BufView::new_bytes(frozen_buf);
|
||||
for meta in blobs_buf.blobs.iter() {
|
||||
let img_buf = frozen_buf.slice(meta.start..meta.end);
|
||||
let img_buf = meta.read(&view).await;
|
||||
|
||||
let img_buf = match img_buf {
|
||||
Ok(img_buf) => img_buf,
|
||||
Err(e) => {
|
||||
reconstruct_state.on_key_error(
|
||||
meta.meta.key,
|
||||
PageReconstructError::Other(anyhow!(e).context(format!(
|
||||
"Failed to decompress blob from virtual file {}",
|
||||
self.file.path,
|
||||
))),
|
||||
);
|
||||
|
||||
continue;
|
||||
}
|
||||
};
|
||||
reconstruct_state.update_key(
|
||||
&meta.meta.key,
|
||||
self.lsn,
|
||||
Value::Image(img_buf),
|
||||
Value::Image(img_buf.into_bytes()),
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1025,10 +1041,15 @@ impl<'a> ImageLayerIterator<'a> {
|
||||
let blobs_buf = vectored_blob_reader
|
||||
.read_blobs(&plan, buf, self.ctx)
|
||||
.await?;
|
||||
let frozen_buf: Bytes = blobs_buf.buf.freeze();
|
||||
let frozen_buf = blobs_buf.buf.freeze();
|
||||
let view = BufView::new_bytes(frozen_buf);
|
||||
for meta in blobs_buf.blobs.iter() {
|
||||
let img_buf = frozen_buf.slice(meta.start..meta.end);
|
||||
next_batch.push_back((meta.meta.key, self.image_layer.lsn, Value::Image(img_buf)));
|
||||
let img_buf = meta.read(&view).await?;
|
||||
next_batch.push_back((
|
||||
meta.meta.key,
|
||||
self.image_layer.lsn,
|
||||
Value::Image(img_buf.into_bytes()),
|
||||
));
|
||||
}
|
||||
self.key_values_batch = next_batch;
|
||||
Ok(())
|
||||
|
||||
@@ -481,8 +481,7 @@ async fn ingest_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken
|
||||
let allowed_rps = tenant.timeline_get_throttle.steady_rps();
|
||||
let delta = now - prev;
|
||||
info!(
|
||||
n_seconds=%format_args!("{:.3}",
|
||||
delta.as_secs_f64()),
|
||||
n_seconds=%format_args!("{:.3}", delta.as_secs_f64()),
|
||||
count_accounted = count_accounted_finish, // don't break existing log scraping
|
||||
count_throttled,
|
||||
sum_throttled_usecs,
|
||||
|
||||
@@ -112,7 +112,7 @@ use pageserver_api::reltag::RelTag;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use postgres_ffi::to_pg_timestamp;
|
||||
use postgres_ffi::{to_pg_timestamp, v14::xlog_utils, WAL_SEGMENT_SIZE};
|
||||
use utils::{
|
||||
completion,
|
||||
generation::Generation,
|
||||
@@ -1337,6 +1337,10 @@ impl Timeline {
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<LsnLease> {
|
||||
let lease = {
|
||||
// Normalize the requested LSN to be aligned, and move to the first record
|
||||
// if it points to the beginning of the page (header).
|
||||
let lsn = xlog_utils::normalize_lsn(lsn, WAL_SEGMENT_SIZE);
|
||||
|
||||
let mut gc_info = self.gc_info.write().unwrap();
|
||||
|
||||
let valid_until = SystemTime::now() + length;
|
||||
@@ -3597,7 +3601,7 @@ impl Timeline {
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| FlushLayerError::from_anyhow(self, e))?;
|
||||
.map_err(|e| FlushLayerError::from_anyhow(self, e.into()))?;
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(FlushLayerError::Cancelled);
|
||||
@@ -3836,16 +3840,20 @@ impl Timeline {
|
||||
partition_size: u64,
|
||||
flags: EnumSet<CompactFlags>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<((KeyPartitioning, SparseKeyPartitioning), Lsn)> {
|
||||
) -> Result<((KeyPartitioning, SparseKeyPartitioning), Lsn), CompactionError> {
|
||||
let Ok(mut partitioning_guard) = self.partitioning.try_lock() else {
|
||||
// NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timeline.
|
||||
// The other is the initdb optimization in flush_frozen_layer, used by `boostrap_timeline`, which runs before `.activate()`
|
||||
// and hence before the compaction task starts.
|
||||
anyhow::bail!("repartition() called concurrently, this should not happen");
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"repartition() called concurrently, this should not happen"
|
||||
)));
|
||||
};
|
||||
let ((dense_partition, sparse_partition), partition_lsn) = &*partitioning_guard;
|
||||
if lsn < *partition_lsn {
|
||||
anyhow::bail!("repartition() called with LSN going backwards, this should not happen");
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"repartition() called with LSN going backwards, this should not happen"
|
||||
)));
|
||||
}
|
||||
|
||||
let distance = lsn.0 - partition_lsn.0;
|
||||
@@ -4447,6 +4455,12 @@ pub(crate) enum CompactionError {
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl CompactionError {
|
||||
pub fn is_cancelled(&self) -> bool {
|
||||
matches!(self, CompactionError::ShuttingDown)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CollectKeySpaceError> for CompactionError {
|
||||
fn from(err: CollectKeySpaceError) -> Self {
|
||||
match err {
|
||||
|
||||
@@ -390,7 +390,7 @@ impl Timeline {
|
||||
// error but continue.
|
||||
//
|
||||
// Suppress error when it's due to cancellation
|
||||
if !self.cancel.is_cancelled() {
|
||||
if !self.cancel.is_cancelled() && !err.is_cancelled() {
|
||||
tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
|
||||
}
|
||||
(1, false)
|
||||
|
||||
@@ -30,8 +30,8 @@ use crate::{
|
||||
pgdatadir_mapping::CollectKeySpaceError,
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
tenant::{
|
||||
storage_layer::LayerVisibilityHint, tasks::BackgroundLoopKind, timeline::EvictionError,
|
||||
LogicalSizeCalculationCause, Tenant,
|
||||
size::CalculateSyntheticSizeError, storage_layer::LayerVisibilityHint,
|
||||
tasks::BackgroundLoopKind, timeline::EvictionError, LogicalSizeCalculationCause, Tenant,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -557,6 +557,8 @@ impl Timeline {
|
||||
gather_result = gather => {
|
||||
match gather_result {
|
||||
Ok(_) => {},
|
||||
// It can happen sometimes that we hit this instead of the cancellation token firing above
|
||||
Err(CalculateSyntheticSizeError::Cancelled) => {}
|
||||
Err(e) => {
|
||||
// We don't care about the result, but, if it failed, we should log it,
|
||||
// since consumption metric might be hitting the cached value and
|
||||
|
||||
@@ -16,8 +16,9 @@
|
||||
//! Note that the vectored blob api does *not* go through the page cache.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::ops::Deref;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use pageserver_api::key::Key;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_epoll_uring::BoundedBuf;
|
||||
@@ -35,11 +36,123 @@ pub struct BlobMeta {
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
/// Blob offsets into [`VectoredBlobsBuf::buf`]
|
||||
/// A view into the vectored blobs read buffer.
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) enum BufView<'a> {
|
||||
Slice(&'a [u8]),
|
||||
Bytes(bytes::Bytes),
|
||||
}
|
||||
|
||||
impl<'a> BufView<'a> {
|
||||
/// Creates a new slice-based view on the blob.
|
||||
pub fn new_slice(slice: &'a [u8]) -> Self {
|
||||
Self::Slice(slice)
|
||||
}
|
||||
|
||||
/// Creates a new [`bytes::Bytes`]-based view on the blob.
|
||||
pub fn new_bytes(bytes: bytes::Bytes) -> Self {
|
||||
Self::Bytes(bytes)
|
||||
}
|
||||
|
||||
/// Convert the view into `Bytes`.
|
||||
///
|
||||
/// If using slice as the underlying storage, the copy will be an O(n) operation.
|
||||
pub fn into_bytes(self) -> Bytes {
|
||||
match self {
|
||||
BufView::Slice(slice) => Bytes::copy_from_slice(slice),
|
||||
BufView::Bytes(bytes) => bytes,
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a sub-view of the blob based on the range.
|
||||
fn view(&self, range: std::ops::Range<usize>) -> Self {
|
||||
match self {
|
||||
BufView::Slice(slice) => BufView::Slice(&slice[range]),
|
||||
BufView::Bytes(bytes) => BufView::Bytes(bytes.slice(range)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Deref for BufView<'a> {
|
||||
type Target = [u8];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
match self {
|
||||
BufView::Slice(slice) => slice,
|
||||
BufView::Bytes(bytes) => bytes,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> AsRef<[u8]> for BufView<'a> {
|
||||
fn as_ref(&self) -> &[u8] {
|
||||
match self {
|
||||
BufView::Slice(slice) => slice,
|
||||
BufView::Bytes(bytes) => bytes.as_ref(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a [u8]> for BufView<'a> {
|
||||
fn from(value: &'a [u8]) -> Self {
|
||||
Self::new_slice(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Bytes> for BufView<'_> {
|
||||
fn from(value: Bytes) -> Self {
|
||||
Self::new_bytes(value)
|
||||
}
|
||||
}
|
||||
|
||||
/// Blob offsets into [`VectoredBlobsBuf::buf`]. The byte ranges is potentially compressed,
|
||||
/// subject to [`VectoredBlob::compression_bits`].
|
||||
pub struct VectoredBlob {
|
||||
pub start: usize,
|
||||
pub end: usize,
|
||||
/// Blob metadata.
|
||||
pub meta: BlobMeta,
|
||||
/// Start offset.
|
||||
start: usize,
|
||||
/// End offset.
|
||||
end: usize,
|
||||
/// Compression used on the the blob.
|
||||
compression_bits: u8,
|
||||
}
|
||||
|
||||
impl VectoredBlob {
|
||||
/// Reads a decompressed view of the blob.
|
||||
pub(crate) async fn read<'a>(&self, buf: &BufView<'a>) -> Result<BufView<'a>, std::io::Error> {
|
||||
let view = buf.view(self.start..self.end);
|
||||
|
||||
match self.compression_bits {
|
||||
BYTE_UNCOMPRESSED => Ok(view),
|
||||
BYTE_ZSTD => {
|
||||
let mut decompressed_vec = Vec::new();
|
||||
let mut decoder =
|
||||
async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec);
|
||||
decoder.write_all(&view).await?;
|
||||
decoder.flush().await?;
|
||||
// Zero-copy conversion from `Vec` to `Bytes`
|
||||
Ok(BufView::new_bytes(Bytes::from(decompressed_vec)))
|
||||
}
|
||||
bits => {
|
||||
let error = std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("Failed to decompress blob for {}@{}, {}..{}: invalid compression byte {bits:x}", self.meta.key, self.meta.lsn, self.start, self.end),
|
||||
);
|
||||
Err(error)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for VectoredBlob {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"{}@{}, {}..{}",
|
||||
self.meta.key, self.meta.lsn, self.start, self.end
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Return type of [`VectoredBlobReader::read_blobs`]
|
||||
@@ -514,7 +627,7 @@ impl<'a> VectoredBlobReader<'a> {
|
||||
);
|
||||
}
|
||||
|
||||
let mut buf = self
|
||||
let buf = self
|
||||
.file
|
||||
.read_exact_at(buf.slice(0..read.size()), read.start, ctx)
|
||||
.await?
|
||||
@@ -529,9 +642,6 @@ impl<'a> VectoredBlobReader<'a> {
|
||||
// of a blob is implicit: the start of the next blob if one exists
|
||||
// or the end of the read.
|
||||
|
||||
// Some scratch space, put here for reusing the allocation
|
||||
let mut decompressed_vec = Vec::new();
|
||||
|
||||
for (blob_start, meta) in blobs_at {
|
||||
let blob_start_in_buf = blob_start - start_offset;
|
||||
let first_len_byte = buf[blob_start_in_buf as usize];
|
||||
@@ -557,35 +667,14 @@ impl<'a> VectoredBlobReader<'a> {
|
||||
)
|
||||
};
|
||||
|
||||
let start_raw = blob_start_in_buf + size_length;
|
||||
let end_raw = start_raw + blob_size;
|
||||
let (start, end);
|
||||
if compression_bits == BYTE_UNCOMPRESSED {
|
||||
start = start_raw as usize;
|
||||
end = end_raw as usize;
|
||||
} else if compression_bits == BYTE_ZSTD {
|
||||
let mut decoder =
|
||||
async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec);
|
||||
decoder
|
||||
.write_all(&buf[start_raw as usize..end_raw as usize])
|
||||
.await?;
|
||||
decoder.flush().await?;
|
||||
start = buf.len();
|
||||
buf.extend_from_slice(&decompressed_vec);
|
||||
end = buf.len();
|
||||
decompressed_vec.clear();
|
||||
} else {
|
||||
let error = std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("invalid compression byte {compression_bits:x}"),
|
||||
);
|
||||
return Err(error);
|
||||
}
|
||||
let start = (blob_start_in_buf + size_length) as usize;
|
||||
let end = start + blob_size as usize;
|
||||
|
||||
metas.push(VectoredBlob {
|
||||
start,
|
||||
end,
|
||||
meta: *meta,
|
||||
compression_bits,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1020,8 +1109,13 @@ mod tests {
|
||||
let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
|
||||
assert_eq!(result.blobs.len(), 1);
|
||||
let read_blob = &result.blobs[0];
|
||||
let read_buf = &result.buf[read_blob.start..read_blob.end];
|
||||
assert_eq!(blob, read_buf, "mismatch for idx={idx} at offset={offset}");
|
||||
let view = BufView::new_slice(&result.buf);
|
||||
let read_buf = read_blob.read(&view).await?;
|
||||
assert_eq!(
|
||||
&blob[..],
|
||||
&read_buf[..],
|
||||
"mismatch for idx={idx} at offset={offset}"
|
||||
);
|
||||
buf = result.buf;
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -25,7 +25,18 @@ SHLIB_LINK_INTERNAL = $(libpq)
|
||||
SHLIB_LINK = -lcurl
|
||||
|
||||
EXTENSION = neon
|
||||
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql neon--1.3--1.4.sql neon--1.4--1.3.sql neon--1.4--1.5.sql neon--1.5--1.4.sql
|
||||
DATA = \
|
||||
neon--1.0.sql \
|
||||
neon--1.0--1.1.sql \
|
||||
neon--1.1--1.2.sql \
|
||||
neon--1.2--1.3.sql \
|
||||
neon--1.3--1.4.sql \
|
||||
neon--1.4--1.5.sql \
|
||||
neon--1.5--1.4.sql \
|
||||
neon--1.4--1.3.sql \
|
||||
neon--1.3--1.2.sql \
|
||||
neon--1.2--1.1.sql \
|
||||
neon--1.1--1.0.sql
|
||||
PGFILEDESC = "neon - cloud storage for PostgreSQL"
|
||||
|
||||
EXTRA_CLEAN = \
|
||||
|
||||
@@ -32,7 +32,7 @@ NeonPerfCountersShmemSize(void)
|
||||
return size;
|
||||
}
|
||||
|
||||
bool
|
||||
void
|
||||
NeonPerfCountersShmemInit(void)
|
||||
{
|
||||
bool found;
|
||||
|
||||
@@ -105,7 +105,7 @@ extern neon_per_backend_counters *neon_per_backend_counters_shared;
|
||||
extern void inc_getpage_wait(uint64 latency);
|
||||
|
||||
extern Size NeonPerfCountersShmemSize(void);
|
||||
extern bool NeonPerfCountersShmemInit(void);
|
||||
extern void NeonPerfCountersShmemInit(void);
|
||||
|
||||
|
||||
#endif /* NEON_PERF_COUNTERS_H */
|
||||
|
||||
@@ -29,7 +29,6 @@ dashmap.workspace = true
|
||||
env_logger.workspace = true
|
||||
framed-websockets.workspace = true
|
||||
futures.workspace = true
|
||||
git-version.workspace = true
|
||||
hashbrown.workspace = true
|
||||
hashlink.workspace = true
|
||||
hex.workspace = true
|
||||
|
||||
@@ -21,7 +21,6 @@ chrono.workspace = true
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
crc32c.workspace = true
|
||||
fail.workspace = true
|
||||
git-version.workspace = true
|
||||
hex.workspace = true
|
||||
humantime.workspace = true
|
||||
hyper.workspace = true
|
||||
|
||||
@@ -15,7 +15,6 @@ const_format.workspace = true
|
||||
futures.workspace = true
|
||||
futures-core.workspace = true
|
||||
futures-util.workspace = true
|
||||
git-version.workspace = true
|
||||
humantime.workspace = true
|
||||
hyper = { workspace = true, features = ["full"] }
|
||||
once_cell.workspace = true
|
||||
|
||||
@@ -20,7 +20,6 @@ chrono.workspace = true
|
||||
clap.workspace = true
|
||||
fail.workspace = true
|
||||
futures.workspace = true
|
||||
git-version.workspace = true
|
||||
hex.workspace = true
|
||||
hyper.workspace = true
|
||||
humantime.workspace = true
|
||||
|
||||
@@ -515,7 +515,7 @@ async fn handle_tenant_timeline_passthrough(
|
||||
tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);
|
||||
|
||||
// Find the node that holds shard zero
|
||||
let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id)?;
|
||||
let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?;
|
||||
|
||||
// Callers will always pass an unsharded tenant ID. Before proxying, we must
|
||||
// rewrite this to a shard-aware shard zero ID.
|
||||
@@ -545,10 +545,10 @@ async fn handle_tenant_timeline_passthrough(
|
||||
let _timer = latency.start_timer(labels.clone());
|
||||
|
||||
let client = mgmt_api::Client::new(node.base_url(), service.get_config().jwt_token.as_deref());
|
||||
let resp = client.get_raw(path).await.map_err(|_e|
|
||||
// FIXME: give APiError a proper Unavailable variant. We return 503 here because
|
||||
// if we can't successfully send a request to the pageserver, we aren't available.
|
||||
ApiError::ShuttingDown)?;
|
||||
let resp = client.get_raw(path).await.map_err(|e|
|
||||
// We return 503 here because if we can't successfully send a request to the pageserver,
|
||||
// either we aren't available or the pageserver is unavailable.
|
||||
ApiError::ResourceUnavailable(format!("Error sending pageserver API request to {node}: {e}").into()))?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
let error_counter = &METRICS_REGISTRY
|
||||
@@ -557,6 +557,19 @@ async fn handle_tenant_timeline_passthrough(
|
||||
error_counter.inc(labels);
|
||||
}
|
||||
|
||||
// Transform 404 into 503 if we raced with a migration
|
||||
if resp.status() == reqwest::StatusCode::NOT_FOUND {
|
||||
// Look up node again: if we migrated it will be different
|
||||
let (new_node, _tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?;
|
||||
if new_node.get_id() != node.get_id() {
|
||||
// Rather than retry here, send the client a 503 to prompt a retry: this matches
|
||||
// the pageserver's use of 503, and all clients calling this API should retry on 503.
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
format!("Pageserver {node} returned 404, was migrated to {new_node}").into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// We have a reqest::Response, would like a http::Response
|
||||
let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp.status())?);
|
||||
for (k, v) in resp.headers() {
|
||||
|
||||
@@ -2,8 +2,8 @@ use std::{str::FromStr, time::Duration};
|
||||
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy,
|
||||
TenantLocateResponseShard,
|
||||
AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeRegisterRequest,
|
||||
NodeSchedulingPolicy, TenantLocateResponseShard,
|
||||
},
|
||||
shard::TenantShardId,
|
||||
};
|
||||
@@ -36,7 +36,7 @@ pub(crate) struct Node {
|
||||
listen_pg_addr: String,
|
||||
listen_pg_port: u16,
|
||||
|
||||
availability_zone_id: String,
|
||||
availability_zone_id: AvailabilityZone,
|
||||
|
||||
// This cancellation token means "stop any RPCs in flight to this node, and don't start
|
||||
// any more". It is not related to process shutdown.
|
||||
@@ -64,8 +64,8 @@ impl Node {
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) fn get_availability_zone_id(&self) -> &str {
|
||||
self.availability_zone_id.as_str()
|
||||
pub(crate) fn get_availability_zone_id(&self) -> &AvailabilityZone {
|
||||
&self.availability_zone_id
|
||||
}
|
||||
|
||||
pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy {
|
||||
@@ -181,7 +181,7 @@ impl Node {
|
||||
listen_http_port: u16,
|
||||
listen_pg_addr: String,
|
||||
listen_pg_port: u16,
|
||||
availability_zone_id: String,
|
||||
availability_zone_id: AvailabilityZone,
|
||||
) -> Self {
|
||||
Self {
|
||||
id,
|
||||
@@ -204,7 +204,7 @@ impl Node {
|
||||
listen_http_port: self.listen_http_port as i32,
|
||||
listen_pg_addr: self.listen_pg_addr.clone(),
|
||||
listen_pg_port: self.listen_pg_port as i32,
|
||||
availability_zone_id: self.availability_zone_id.clone(),
|
||||
availability_zone_id: self.availability_zone_id.0.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -219,7 +219,7 @@ impl Node {
|
||||
listen_http_port: np.listen_http_port as u16,
|
||||
listen_pg_addr: np.listen_pg_addr,
|
||||
listen_pg_port: np.listen_pg_port as u16,
|
||||
availability_zone_id: np.availability_zone_id,
|
||||
availability_zone_id: AvailabilityZone(np.availability_zone_id),
|
||||
cancel: CancellationToken::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ use diesel::pg::PgConnection;
|
||||
use diesel::prelude::*;
|
||||
use diesel::Connection;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::controller_api::AvailabilityZone;
|
||||
use pageserver_api::controller_api::MetadataHealthRecord;
|
||||
use pageserver_api::controller_api::ShardSchedulingPolicy;
|
||||
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
|
||||
@@ -667,8 +668,8 @@ impl Persistence {
|
||||
|
||||
pub(crate) async fn set_tenant_shard_preferred_azs(
|
||||
&self,
|
||||
preferred_azs: Vec<(TenantShardId, String)>,
|
||||
) -> DatabaseResult<Vec<(TenantShardId, String)>> {
|
||||
preferred_azs: Vec<(TenantShardId, AvailabilityZone)>,
|
||||
) -> DatabaseResult<Vec<(TenantShardId, AvailabilityZone)>> {
|
||||
use crate::schema::tenant_shards::dsl::*;
|
||||
|
||||
self.with_measured_conn(DatabaseOperation::SetPreferredAzs, move |conn| {
|
||||
@@ -679,7 +680,7 @@ impl Persistence {
|
||||
.filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
|
||||
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
|
||||
.filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
|
||||
.set(preferred_az_id.eq(preferred_az))
|
||||
.set(preferred_az_id.eq(preferred_az.0.clone()))
|
||||
.execute(conn)?;
|
||||
|
||||
if updated == 1 {
|
||||
|
||||
@@ -463,7 +463,7 @@ impl Reconciler {
|
||||
for (timeline_id, baseline_lsn) in &baseline {
|
||||
match latest.get(timeline_id) {
|
||||
Some(latest_lsn) => {
|
||||
tracing::info!("🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}");
|
||||
tracing::info!(timeline_id = %timeline_id, "🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}");
|
||||
if latest_lsn < baseline_lsn {
|
||||
any_behind = true;
|
||||
}
|
||||
@@ -541,6 +541,8 @@ impl Reconciler {
|
||||
}
|
||||
}
|
||||
|
||||
pausable_failpoint!("reconciler-live-migrate-pre-generation-inc");
|
||||
|
||||
// Increment generation before attaching to new pageserver
|
||||
self.generation = Some(
|
||||
self.persistence
|
||||
@@ -617,6 +619,8 @@ impl Reconciler {
|
||||
},
|
||||
);
|
||||
|
||||
pausable_failpoint!("reconciler-live-migrate-post-detach");
|
||||
|
||||
tracing::info!("🔁 Switching to AttachedSingle mode on node {dest_ps}",);
|
||||
let dest_final_conf = build_location_config(
|
||||
&self.shard,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use crate::{node::Node, tenant_shard::TenantShard};
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::models::PageserverUtilization;
|
||||
use pageserver_api::{controller_api::AvailabilityZone, models::PageserverUtilization};
|
||||
use serde::Serialize;
|
||||
use std::{collections::HashMap, fmt::Debug};
|
||||
use utils::{http::error::ApiError, id::NodeId};
|
||||
@@ -32,6 +32,8 @@ pub(crate) struct SchedulerNode {
|
||||
shard_count: usize,
|
||||
/// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`].
|
||||
attached_shard_count: usize,
|
||||
/// Availability zone id in which the node resides
|
||||
az: AvailabilityZone,
|
||||
|
||||
/// Whether this node is currently elegible to have new shards scheduled (this is derived
|
||||
/// from a node's availability state and scheduling policy).
|
||||
@@ -42,6 +44,7 @@ pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized {
|
||||
fn generate(
|
||||
node_id: &NodeId,
|
||||
node: &mut SchedulerNode,
|
||||
preferred_az: &Option<AvailabilityZone>,
|
||||
context: &ScheduleContext,
|
||||
) -> Option<Self>;
|
||||
fn is_overloaded(&self) -> bool;
|
||||
@@ -62,6 +65,72 @@ impl ShardTag for SecondaryShardTag {
|
||||
type Score = NodeSecondarySchedulingScore;
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Debug, Clone, Copy)]
|
||||
enum AzMatch {
|
||||
Yes,
|
||||
No,
|
||||
Unknown,
|
||||
}
|
||||
|
||||
impl AzMatch {
|
||||
fn new(node_az: &AvailabilityZone, shard_preferred_az: Option<&AvailabilityZone>) -> Self {
|
||||
match shard_preferred_az {
|
||||
Some(preferred_az) if preferred_az == node_az => Self::Yes,
|
||||
Some(_preferred_az) => Self::No,
|
||||
None => Self::Unknown,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Debug, Clone, Copy)]
|
||||
struct AttachmentAzMatch(AzMatch);
|
||||
|
||||
impl Ord for AttachmentAzMatch {
|
||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||
// Lower scores indicate a more suitable node.
|
||||
// Note that we prefer a node for which we don't have
|
||||
// info to a node which we are certain doesn't match the
|
||||
// preferred AZ of the shard.
|
||||
let az_match_score = |az_match: &AzMatch| match az_match {
|
||||
AzMatch::Yes => 0,
|
||||
AzMatch::Unknown => 1,
|
||||
AzMatch::No => 2,
|
||||
};
|
||||
|
||||
az_match_score(&self.0).cmp(&az_match_score(&other.0))
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for AttachmentAzMatch {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Debug, Clone, Copy)]
|
||||
struct SecondaryAzMatch(AzMatch);
|
||||
|
||||
impl Ord for SecondaryAzMatch {
|
||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||
// Lower scores indicate a more suitable node.
|
||||
// For secondary locations we wish to avoid the preferred AZ
|
||||
// of the shard.
|
||||
let az_match_score = |az_match: &AzMatch| match az_match {
|
||||
AzMatch::No => 0,
|
||||
AzMatch::Unknown => 1,
|
||||
AzMatch::Yes => 2,
|
||||
};
|
||||
|
||||
az_match_score(&self.0).cmp(&az_match_score(&other.0))
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for SecondaryAzMatch {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
/// Scheduling score of a given node for shard attachments.
|
||||
/// Lower scores indicate more suitable nodes.
|
||||
/// Ordering is given by member declaration order (top to bottom).
|
||||
@@ -70,6 +139,10 @@ pub(crate) struct NodeAttachmentSchedulingScore {
|
||||
/// The number of shards belonging to the tenant currently being
|
||||
/// scheduled that are attached to this node.
|
||||
affinity_score: AffinityScore,
|
||||
/// Flag indicating whether this node matches the preferred AZ
|
||||
/// of the shard. For equal affinity scores, nodes in the matching AZ
|
||||
/// are considered first.
|
||||
az_match: AttachmentAzMatch,
|
||||
/// Size of [`ScheduleContext::attached_nodes`] for the current node.
|
||||
/// This normally tracks the number of attached shards belonging to the
|
||||
/// tenant being scheduled that are already on this node.
|
||||
@@ -87,6 +160,7 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
|
||||
fn generate(
|
||||
node_id: &NodeId,
|
||||
node: &mut SchedulerNode,
|
||||
preferred_az: &Option<AvailabilityZone>,
|
||||
context: &ScheduleContext,
|
||||
) -> Option<Self> {
|
||||
let utilization = match &mut node.may_schedule {
|
||||
@@ -102,6 +176,7 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
|
||||
.get(node_id)
|
||||
.copied()
|
||||
.unwrap_or(AffinityScore::FREE),
|
||||
az_match: AttachmentAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())),
|
||||
attached_shards_in_context: context.attached_nodes.get(node_id).copied().unwrap_or(0),
|
||||
utilization_score: utilization.cached_score(),
|
||||
total_attached_shard_count: node.attached_shard_count,
|
||||
@@ -123,6 +198,11 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
|
||||
/// Ordering is given by member declaration order (top to bottom).
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
|
||||
pub(crate) struct NodeSecondarySchedulingScore {
|
||||
/// Flag indicating whether this node matches the preferred AZ
|
||||
/// of the shard. For secondary locations we wish to avoid nodes in.
|
||||
/// the preferred AZ of the shard, since that's where the attached location
|
||||
/// should be scheduled and having the secondary in the same AZ is bad for HA.
|
||||
az_match: SecondaryAzMatch,
|
||||
/// The number of shards belonging to the tenant currently being
|
||||
/// scheduled that are attached to this node.
|
||||
affinity_score: AffinityScore,
|
||||
@@ -139,6 +219,7 @@ impl NodeSchedulingScore for NodeSecondarySchedulingScore {
|
||||
fn generate(
|
||||
node_id: &NodeId,
|
||||
node: &mut SchedulerNode,
|
||||
preferred_az: &Option<AvailabilityZone>,
|
||||
context: &ScheduleContext,
|
||||
) -> Option<Self> {
|
||||
let utilization = match &mut node.may_schedule {
|
||||
@@ -149,6 +230,7 @@ impl NodeSchedulingScore for NodeSecondarySchedulingScore {
|
||||
};
|
||||
|
||||
Some(Self {
|
||||
az_match: SecondaryAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())),
|
||||
affinity_score: context
|
||||
.nodes
|
||||
.get(node_id)
|
||||
@@ -179,6 +261,7 @@ impl PartialEq for SchedulerNode {
|
||||
may_schedule_matches
|
||||
&& self.shard_count == other.shard_count
|
||||
&& self.attached_shard_count == other.attached_shard_count
|
||||
&& self.az == other.az
|
||||
}
|
||||
}
|
||||
|
||||
@@ -293,6 +376,7 @@ impl Scheduler {
|
||||
shard_count: 0,
|
||||
attached_shard_count: 0,
|
||||
may_schedule: node.may_schedule(),
|
||||
az: node.get_availability_zone_id().clone(),
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -319,6 +403,7 @@ impl Scheduler {
|
||||
shard_count: 0,
|
||||
attached_shard_count: 0,
|
||||
may_schedule: node.may_schedule(),
|
||||
az: node.get_availability_zone_id().clone(),
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -497,6 +582,7 @@ impl Scheduler {
|
||||
shard_count: 0,
|
||||
attached_shard_count: 0,
|
||||
may_schedule: node.may_schedule(),
|
||||
az: node.get_availability_zone_id().clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -542,6 +628,7 @@ impl Scheduler {
|
||||
fn compute_node_scores<Score>(
|
||||
&mut self,
|
||||
hard_exclude: &[NodeId],
|
||||
preferred_az: &Option<AvailabilityZone>,
|
||||
context: &ScheduleContext,
|
||||
) -> Vec<Score>
|
||||
where
|
||||
@@ -553,7 +640,7 @@ impl Scheduler {
|
||||
if hard_exclude.contains(k) {
|
||||
None
|
||||
} else {
|
||||
Score::generate(k, v, context)
|
||||
Score::generate(k, v, preferred_az, context)
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
@@ -571,13 +658,15 @@ impl Scheduler {
|
||||
pub(crate) fn schedule_shard<Tag: ShardTag>(
|
||||
&mut self,
|
||||
hard_exclude: &[NodeId],
|
||||
preferred_az: &Option<AvailabilityZone>,
|
||||
context: &ScheduleContext,
|
||||
) -> Result<NodeId, ScheduleError> {
|
||||
if self.nodes.is_empty() {
|
||||
return Err(ScheduleError::NoPageservers);
|
||||
}
|
||||
|
||||
let mut scores = self.compute_node_scores::<Tag::Score>(hard_exclude, context);
|
||||
let mut scores =
|
||||
self.compute_node_scores::<Tag::Score>(hard_exclude, preferred_az, context);
|
||||
|
||||
// Exclude nodes whose utilization is critically high, if there are alternatives available. This will
|
||||
// cause us to violate affinity rules if it is necessary to avoid critically overloading nodes: for example
|
||||
@@ -634,6 +723,12 @@ impl Scheduler {
|
||||
Ok(node_id)
|
||||
}
|
||||
|
||||
/// Selects any available node. This is suitable for performing background work (e.g. S3
|
||||
/// deletions).
|
||||
pub(crate) fn any_available_node(&mut self) -> Result<NodeId, ScheduleError> {
|
||||
self.schedule_shard::<AttachedShardTag>(&[], &None, &ScheduleContext::default())
|
||||
}
|
||||
|
||||
/// Unit test access to internal state
|
||||
#[cfg(test)]
|
||||
pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
|
||||
@@ -650,13 +745,22 @@ impl Scheduler {
|
||||
pub(crate) mod test_utils {
|
||||
|
||||
use crate::node::Node;
|
||||
use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization};
|
||||
use pageserver_api::{
|
||||
controller_api::{AvailabilityZone, NodeAvailability},
|
||||
models::utilization::test_utilization,
|
||||
};
|
||||
use std::collections::HashMap;
|
||||
use utils::id::NodeId;
|
||||
|
||||
/// Test helper: synthesize the requested number of nodes, all in active state.
|
||||
///
|
||||
/// Node IDs start at one.
|
||||
pub(crate) fn make_test_nodes(n: u64) -> HashMap<NodeId, Node> {
|
||||
///
|
||||
/// The `azs` argument specifies the list of availability zones which will be assigned
|
||||
/// to nodes in round-robin fashion. If empy, a default AZ is assigned.
|
||||
pub(crate) fn make_test_nodes(n: u64, azs: &[AvailabilityZone]) -> HashMap<NodeId, Node> {
|
||||
let mut az_iter = azs.iter().cycle();
|
||||
|
||||
(1..n + 1)
|
||||
.map(|i| {
|
||||
(NodeId(i), {
|
||||
@@ -666,7 +770,10 @@ pub(crate) mod test_utils {
|
||||
80 + i as u16,
|
||||
format!("pghost-{i}"),
|
||||
5432 + i as u16,
|
||||
"test-az".to_string(),
|
||||
az_iter
|
||||
.next()
|
||||
.cloned()
|
||||
.unwrap_or(AvailabilityZone("test-az".to_string())),
|
||||
);
|
||||
node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0)));
|
||||
assert!(node.is_available());
|
||||
@@ -686,7 +793,7 @@ mod tests {
|
||||
use crate::tenant_shard::IntentState;
|
||||
#[test]
|
||||
fn scheduler_basic() -> anyhow::Result<()> {
|
||||
let nodes = test_utils::make_test_nodes(2);
|
||||
let nodes = test_utils::make_test_nodes(2, &[]);
|
||||
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
let mut t1_intent = IntentState::new();
|
||||
@@ -694,9 +801,9 @@ mod tests {
|
||||
|
||||
let context = ScheduleContext::default();
|
||||
|
||||
let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &context)?;
|
||||
let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &None, &context)?;
|
||||
t1_intent.set_attached(&mut scheduler, Some(scheduled));
|
||||
let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &context)?;
|
||||
let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &None, &context)?;
|
||||
t2_intent.set_attached(&mut scheduler, Some(scheduled));
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
|
||||
@@ -705,8 +812,11 @@ mod tests {
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
|
||||
|
||||
let scheduled =
|
||||
scheduler.schedule_shard::<AttachedShardTag>(&t1_intent.all_pageservers(), &context)?;
|
||||
let scheduled = scheduler.schedule_shard::<AttachedShardTag>(
|
||||
&t1_intent.all_pageservers(),
|
||||
&None,
|
||||
&context,
|
||||
)?;
|
||||
t1_intent.push_secondary(&mut scheduler, scheduled);
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
|
||||
@@ -746,7 +856,7 @@ mod tests {
|
||||
#[test]
|
||||
/// Test the PageserverUtilization's contribution to scheduling algorithm
|
||||
fn scheduler_utilization() {
|
||||
let mut nodes = test_utils::make_test_nodes(3);
|
||||
let mut nodes = test_utils::make_test_nodes(3, &[]);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
// Need to keep these alive because they contribute to shard counts via RAII
|
||||
@@ -761,7 +871,7 @@ mod tests {
|
||||
context: &ScheduleContext,
|
||||
) {
|
||||
let scheduled = scheduler
|
||||
.schedule_shard::<AttachedShardTag>(&[], context)
|
||||
.schedule_shard::<AttachedShardTag>(&[], &None, context)
|
||||
.unwrap();
|
||||
let mut intent = IntentState::new();
|
||||
intent.set_attached(scheduler, Some(scheduled));
|
||||
@@ -870,4 +980,98 @@ mod tests {
|
||||
intent.clear(&mut scheduler);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
/// A simple test that showcases AZ-aware scheduling and its interaction with
|
||||
/// affinity scores.
|
||||
fn az_scheduling() {
|
||||
let az_a_tag = AvailabilityZone("az-a".to_string());
|
||||
let az_b_tag = AvailabilityZone("az-b".to_string());
|
||||
|
||||
let nodes = test_utils::make_test_nodes(3, &[az_a_tag.clone(), az_b_tag.clone()]);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
// Need to keep these alive because they contribute to shard counts via RAII
|
||||
let mut scheduled_intents = Vec::new();
|
||||
|
||||
let mut context = ScheduleContext::default();
|
||||
|
||||
fn assert_scheduler_chooses<Tag: ShardTag>(
|
||||
expect_node: NodeId,
|
||||
preferred_az: Option<AvailabilityZone>,
|
||||
scheduled_intents: &mut Vec<IntentState>,
|
||||
scheduler: &mut Scheduler,
|
||||
context: &mut ScheduleContext,
|
||||
) {
|
||||
let scheduled = scheduler
|
||||
.schedule_shard::<Tag>(&[], &preferred_az, context)
|
||||
.unwrap();
|
||||
let mut intent = IntentState::new();
|
||||
intent.set_attached(scheduler, Some(scheduled));
|
||||
scheduled_intents.push(intent);
|
||||
assert_eq!(scheduled, expect_node);
|
||||
|
||||
context.avoid(&[scheduled]);
|
||||
}
|
||||
|
||||
assert_scheduler_chooses::<AttachedShardTag>(
|
||||
NodeId(1),
|
||||
Some(az_a_tag.clone()),
|
||||
&mut scheduled_intents,
|
||||
&mut scheduler,
|
||||
&mut context,
|
||||
);
|
||||
|
||||
// Node 2 and 3 have affinity score equal to 0, but node 3
|
||||
// is in "az-a" so we prefer that.
|
||||
assert_scheduler_chooses::<AttachedShardTag>(
|
||||
NodeId(3),
|
||||
Some(az_a_tag.clone()),
|
||||
&mut scheduled_intents,
|
||||
&mut scheduler,
|
||||
&mut context,
|
||||
);
|
||||
|
||||
// Node 2 is not in "az-a", but it has the lowest affinity so we prefer that.
|
||||
assert_scheduler_chooses::<AttachedShardTag>(
|
||||
NodeId(2),
|
||||
Some(az_a_tag.clone()),
|
||||
&mut scheduled_intents,
|
||||
&mut scheduler,
|
||||
&mut context,
|
||||
);
|
||||
|
||||
// Avoid nodes in "az-a" for the secondary location.
|
||||
assert_scheduler_chooses::<SecondaryShardTag>(
|
||||
NodeId(2),
|
||||
Some(az_a_tag.clone()),
|
||||
&mut scheduled_intents,
|
||||
&mut scheduler,
|
||||
&mut context,
|
||||
);
|
||||
|
||||
// Avoid nodes in "az-b" for the secondary location.
|
||||
// Nodes 1 and 3 are identically loaded, so prefer the lowest node id.
|
||||
assert_scheduler_chooses::<SecondaryShardTag>(
|
||||
NodeId(1),
|
||||
Some(az_b_tag.clone()),
|
||||
&mut scheduled_intents,
|
||||
&mut scheduler,
|
||||
&mut context,
|
||||
);
|
||||
|
||||
// Avoid nodes in "az-b" for the secondary location.
|
||||
// Node 3 has lower affinity score than 1, so prefer that.
|
||||
assert_scheduler_chooses::<SecondaryShardTag>(
|
||||
NodeId(3),
|
||||
Some(az_b_tag.clone()),
|
||||
&mut scheduled_intents,
|
||||
&mut scheduler,
|
||||
&mut context,
|
||||
);
|
||||
|
||||
for mut intent in scheduled_intents {
|
||||
intent.clear(&mut scheduler);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ use crate::{
|
||||
ShardGenerationState, TenantFilter,
|
||||
},
|
||||
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
|
||||
scheduler::{AttachedShardTag, MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
|
||||
tenant_shard::{
|
||||
MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization,
|
||||
ScheduleOptimizationAction,
|
||||
@@ -1265,6 +1265,8 @@ impl Service {
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
use pageserver_api::controller_api::AvailabilityZone;
|
||||
|
||||
// Hack: insert scheduler state for all nodes referenced by shards, as compatibility
|
||||
// tests only store the shards, not the nodes. The nodes will be loaded shortly
|
||||
// after when pageservers start up and register.
|
||||
@@ -1282,7 +1284,7 @@ impl Service {
|
||||
123,
|
||||
"".to_string(),
|
||||
123,
|
||||
"test_az".to_string(),
|
||||
AvailabilityZone("test_az".to_string()),
|
||||
);
|
||||
|
||||
scheduler.node_upsert(&node);
|
||||
@@ -2099,7 +2101,7 @@ impl Service {
|
||||
let az_id = locked
|
||||
.nodes
|
||||
.get(&resp.node_id)
|
||||
.map(|n| n.get_availability_zone_id().to_string())?;
|
||||
.map(|n| n.get_availability_zone_id().clone())?;
|
||||
|
||||
Some((resp.shard_id, az_id))
|
||||
})
|
||||
@@ -2629,8 +2631,7 @@ impl Service {
|
||||
let scheduler = &mut locked.scheduler;
|
||||
// Right now we only perform the operation on a single node without parallelization
|
||||
// TODO fan out the operation to multiple nodes for better performance
|
||||
let node_id =
|
||||
scheduler.schedule_shard::<AttachedShardTag>(&[], &ScheduleContext::default())?;
|
||||
let node_id = scheduler.any_available_node()?;
|
||||
let node = locked
|
||||
.nodes
|
||||
.get(&node_id)
|
||||
@@ -2816,8 +2817,7 @@ impl Service {
|
||||
|
||||
// Pick an arbitrary node to use for remote deletions (does not have to be where the tenant
|
||||
// was attached, just has to be able to see the S3 content)
|
||||
let node_id =
|
||||
scheduler.schedule_shard::<AttachedShardTag>(&[], &ScheduleContext::default())?;
|
||||
let node_id = scheduler.any_available_node()?;
|
||||
let node = nodes
|
||||
.get(&node_id)
|
||||
.expect("Pageservers may not be deleted while lock is active");
|
||||
@@ -3508,34 +3508,66 @@ impl Service {
|
||||
|
||||
/// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this
|
||||
/// function looks up and returns node. If the tenant isn't found, returns Err(ApiError::NotFound)
|
||||
pub(crate) fn tenant_shard0_node(
|
||||
pub(crate) async fn tenant_shard0_node(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<(Node, TenantShardId), ApiError> {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let Some((tenant_shard_id, shard)) = locked
|
||||
.tenants
|
||||
.range(TenantShardId::tenant_range(tenant_id))
|
||||
.next()
|
||||
// Look up in-memory state and maybe use the node from there.
|
||||
{
|
||||
let locked = self.inner.read().unwrap();
|
||||
let Some((tenant_shard_id, shard)) = locked
|
||||
.tenants
|
||||
.range(TenantShardId::tenant_range(tenant_id))
|
||||
.next()
|
||||
else {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant {tenant_id} not found").into(),
|
||||
));
|
||||
};
|
||||
|
||||
let Some(intent_node_id) = shard.intent.get_attached() else {
|
||||
tracing::warn!(
|
||||
tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
|
||||
"Shard not scheduled (policy {:?}), cannot generate pass-through URL",
|
||||
shard.policy
|
||||
);
|
||||
return Err(ApiError::Conflict(
|
||||
"Cannot call timeline API on non-attached tenant".to_string(),
|
||||
));
|
||||
};
|
||||
|
||||
if shard.reconciler.is_none() {
|
||||
// Optimization: while no reconcile is in flight, we may trust our in-memory state
|
||||
// to tell us which pageserver to use. Otherwise we will fall through and hit the database
|
||||
let Some(node) = locked.nodes.get(intent_node_id) else {
|
||||
// This should never happen
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Shard refers to nonexistent node"
|
||||
)));
|
||||
};
|
||||
return Ok((node.clone(), *tenant_shard_id));
|
||||
}
|
||||
};
|
||||
|
||||
// Look up the latest attached pageserver location from the database
|
||||
// generation state: this will reflect the progress of any ongoing migration.
|
||||
// Note that it is not guaranteed to _stay_ here, our caller must still handle
|
||||
// the case where they call through to the pageserver and get a 404.
|
||||
let db_result = self.persistence.tenant_generations(tenant_id).await?;
|
||||
let Some(ShardGenerationState {
|
||||
tenant_shard_id,
|
||||
generation: _,
|
||||
generation_pageserver: Some(node_id),
|
||||
}) = db_result.first()
|
||||
else {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant {tenant_id} not found").into(),
|
||||
// This can happen if we raced with a tenant deletion or a shard split. On a retry
|
||||
// the caller will either succeed (shard split case), get a proper 404 (deletion case),
|
||||
// or a conflict response (case where tenant was detached in background)
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
"Shard {} not found in database, or is not attached".into(),
|
||||
));
|
||||
};
|
||||
|
||||
// TODO: should use the ID last published to compute_hook, rather than the intent: the intent might
|
||||
// point to somewhere we haven't attached yet.
|
||||
let Some(node_id) = shard.intent.get_attached() else {
|
||||
tracing::warn!(
|
||||
tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
|
||||
"Shard not scheduled (policy {:?}), cannot generate pass-through URL",
|
||||
shard.policy
|
||||
);
|
||||
return Err(ApiError::Conflict(
|
||||
"Cannot call timeline API on non-attached tenant".to_string(),
|
||||
));
|
||||
};
|
||||
|
||||
let locked = self.inner.read().unwrap();
|
||||
let Some(node) = locked.nodes.get(node_id) else {
|
||||
// This should never happen
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
@@ -4481,7 +4513,7 @@ impl Service {
|
||||
let az_id = locked
|
||||
.nodes
|
||||
.get(node_id)
|
||||
.map(|n| n.get_availability_zone_id().to_string())?;
|
||||
.map(|n| n.get_availability_zone_id().clone())?;
|
||||
|
||||
Some((*tid, az_id))
|
||||
})
|
||||
|
||||
@@ -15,7 +15,7 @@ use crate::{
|
||||
service::ReconcileResultRequest,
|
||||
};
|
||||
use pageserver_api::controller_api::{
|
||||
NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
|
||||
AvailabilityZone, NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
|
||||
};
|
||||
use pageserver_api::{
|
||||
models::{LocationConfig, LocationConfigMode, TenantConfig},
|
||||
@@ -146,7 +146,7 @@ pub(crate) struct TenantShard {
|
||||
|
||||
// We should attempt to schedule this shard in the provided AZ to
|
||||
// decrease chances of cross-AZ compute.
|
||||
preferred_az_id: Option<String>,
|
||||
preferred_az_id: Option<AvailabilityZone>,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone, Debug, Serialize)]
|
||||
@@ -540,14 +540,22 @@ impl TenantShard {
|
||||
Ok((true, promote_secondary))
|
||||
} else {
|
||||
// Pick a fresh node: either we had no secondaries or none were schedulable
|
||||
let node_id =
|
||||
scheduler.schedule_shard::<AttachedShardTag>(&self.intent.secondary, context)?;
|
||||
let node_id = scheduler.schedule_shard::<AttachedShardTag>(
|
||||
&self.intent.secondary,
|
||||
&self.preferred_az_id,
|
||||
context,
|
||||
)?;
|
||||
tracing::debug!("Selected {} as attached", node_id);
|
||||
self.intent.set_attached(scheduler, Some(node_id));
|
||||
Ok((true, node_id))
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(
|
||||
tenant_id=%self.tenant_shard_id.tenant_id,
|
||||
shard_id=%self.tenant_shard_id.shard_slug(),
|
||||
sequence=%self.sequence
|
||||
))]
|
||||
pub(crate) fn schedule(
|
||||
&mut self,
|
||||
scheduler: &mut Scheduler,
|
||||
@@ -617,8 +625,11 @@ impl TenantShard {
|
||||
|
||||
let mut used_pageservers = vec![attached_node_id];
|
||||
while self.intent.secondary.len() < secondary_count {
|
||||
let node_id = scheduler
|
||||
.schedule_shard::<SecondaryShardTag>(&used_pageservers, context)?;
|
||||
let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
|
||||
&used_pageservers,
|
||||
&self.preferred_az_id,
|
||||
context,
|
||||
)?;
|
||||
self.intent.push_secondary(scheduler, node_id);
|
||||
used_pageservers.push(node_id);
|
||||
modified = true;
|
||||
@@ -631,7 +642,11 @@ impl TenantShard {
|
||||
modified = true;
|
||||
} else if self.intent.secondary.is_empty() {
|
||||
// Populate secondary by scheduling a fresh node
|
||||
let node_id = scheduler.schedule_shard::<SecondaryShardTag>(&[], context)?;
|
||||
let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
|
||||
&[],
|
||||
&self.preferred_az_id,
|
||||
context,
|
||||
)?;
|
||||
self.intent.push_secondary(scheduler, node_id);
|
||||
modified = true;
|
||||
}
|
||||
@@ -810,6 +825,7 @@ impl TenantShard {
|
||||
// with lower utilization.
|
||||
let Ok(candidate_node) = scheduler.schedule_shard::<SecondaryShardTag>(
|
||||
&self.intent.all_pageservers(),
|
||||
&self.preferred_az_id,
|
||||
schedule_context,
|
||||
) else {
|
||||
// A scheduling error means we have no possible candidate replacements
|
||||
@@ -1308,7 +1324,7 @@ impl TenantShard {
|
||||
pending_compute_notification: false,
|
||||
delayed_reconcile: false,
|
||||
scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
|
||||
preferred_az_id: tsp.preferred_az_id,
|
||||
preferred_az_id: tsp.preferred_az_id.map(AvailabilityZone),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1324,15 +1340,15 @@ impl TenantShard {
|
||||
config: serde_json::to_string(&self.config).unwrap(),
|
||||
splitting: SplitState::default(),
|
||||
scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
|
||||
preferred_az_id: self.preferred_az_id.clone(),
|
||||
preferred_az_id: self.preferred_az_id.as_ref().map(|az| az.0.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn preferred_az(&self) -> Option<&str> {
|
||||
self.preferred_az_id.as_deref()
|
||||
pub(crate) fn preferred_az(&self) -> Option<&AvailabilityZone> {
|
||||
self.preferred_az_id.as_ref()
|
||||
}
|
||||
|
||||
pub(crate) fn set_preferred_az(&mut self, preferred_az_id: String) {
|
||||
pub(crate) fn set_preferred_az(&mut self, preferred_az_id: AvailabilityZone) {
|
||||
self.preferred_az_id = Some(preferred_az_id);
|
||||
}
|
||||
}
|
||||
@@ -1345,6 +1361,7 @@ pub(crate) mod tests {
|
||||
controller_api::NodeAvailability,
|
||||
shard::{ShardCount, ShardNumber},
|
||||
};
|
||||
use rand::{rngs::StdRng, SeedableRng};
|
||||
use utils::id::TenantId;
|
||||
|
||||
use crate::scheduler::test_utils::make_test_nodes;
|
||||
@@ -1373,7 +1390,11 @@ pub(crate) mod tests {
|
||||
)
|
||||
}
|
||||
|
||||
fn make_test_tenant(policy: PlacementPolicy, shard_count: ShardCount) -> Vec<TenantShard> {
|
||||
fn make_test_tenant(
|
||||
policy: PlacementPolicy,
|
||||
shard_count: ShardCount,
|
||||
preferred_az: Option<AvailabilityZone>,
|
||||
) -> Vec<TenantShard> {
|
||||
let tenant_id = TenantId::generate();
|
||||
|
||||
(0..shard_count.count())
|
||||
@@ -1385,7 +1406,7 @@ pub(crate) mod tests {
|
||||
shard_number,
|
||||
shard_count,
|
||||
};
|
||||
TenantShard::new(
|
||||
let mut ts = TenantShard::new(
|
||||
tenant_shard_id,
|
||||
ShardIdentity::new(
|
||||
shard_number,
|
||||
@@ -1394,7 +1415,13 @@ pub(crate) mod tests {
|
||||
)
|
||||
.unwrap(),
|
||||
policy.clone(),
|
||||
)
|
||||
);
|
||||
|
||||
if let Some(az) = &preferred_az {
|
||||
ts.set_preferred_az(az.clone());
|
||||
}
|
||||
|
||||
ts
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
@@ -1405,7 +1432,7 @@ pub(crate) mod tests {
|
||||
fn tenant_ha_scheduling() -> anyhow::Result<()> {
|
||||
// Start with three nodes. Our tenant will only use two. The third one is
|
||||
// expected to remain unused.
|
||||
let mut nodes = make_test_nodes(3);
|
||||
let mut nodes = make_test_nodes(3, &[]);
|
||||
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
let mut context = ScheduleContext::default();
|
||||
@@ -1457,7 +1484,7 @@ pub(crate) mod tests {
|
||||
|
||||
#[test]
|
||||
fn intent_from_observed() -> anyhow::Result<()> {
|
||||
let nodes = make_test_nodes(3);
|
||||
let nodes = make_test_nodes(3, &[]);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
@@ -1507,7 +1534,7 @@ pub(crate) mod tests {
|
||||
|
||||
#[test]
|
||||
fn scheduling_mode() -> anyhow::Result<()> {
|
||||
let nodes = make_test_nodes(3);
|
||||
let nodes = make_test_nodes(3, &[]);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
@@ -1532,7 +1559,7 @@ pub(crate) mod tests {
|
||||
|
||||
#[test]
|
||||
fn optimize_attachment() -> anyhow::Result<()> {
|
||||
let nodes = make_test_nodes(3);
|
||||
let nodes = make_test_nodes(3, &[]);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
@@ -1599,7 +1626,7 @@ pub(crate) mod tests {
|
||||
|
||||
#[test]
|
||||
fn optimize_secondary() -> anyhow::Result<()> {
|
||||
let nodes = make_test_nodes(4);
|
||||
let nodes = make_test_nodes(4, &[]);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
@@ -1698,14 +1725,14 @@ pub(crate) mod tests {
|
||||
/// that it converges.
|
||||
#[test]
|
||||
fn optimize_add_nodes() -> anyhow::Result<()> {
|
||||
let nodes = make_test_nodes(4);
|
||||
let nodes = make_test_nodes(4, &[]);
|
||||
|
||||
// Only show the scheduler a couple of nodes
|
||||
let mut scheduler = Scheduler::new([].iter());
|
||||
scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
|
||||
scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
|
||||
|
||||
let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
|
||||
let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
for shard in &mut shards {
|
||||
assert!(shard
|
||||
@@ -1754,16 +1781,16 @@ pub(crate) mod tests {
|
||||
fn initial_scheduling_is_optimal() -> anyhow::Result<()> {
|
||||
use itertools::Itertools;
|
||||
|
||||
let nodes = make_test_nodes(2);
|
||||
let nodes = make_test_nodes(2, &[]);
|
||||
|
||||
let mut scheduler = Scheduler::new([].iter());
|
||||
scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
|
||||
scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
|
||||
|
||||
let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
|
||||
let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
|
||||
let a_context = Rc::new(RefCell::new(ScheduleContext::default()));
|
||||
|
||||
let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
|
||||
let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
|
||||
let b_context = Rc::new(RefCell::new(ScheduleContext::default()));
|
||||
|
||||
let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone()));
|
||||
@@ -1788,4 +1815,147 @@ pub(crate) mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn random_az_shard_scheduling() -> anyhow::Result<()> {
|
||||
use rand::seq::SliceRandom;
|
||||
|
||||
for seed in 0..50 {
|
||||
eprintln!("Running test with seed {seed}");
|
||||
let mut rng = StdRng::seed_from_u64(seed);
|
||||
|
||||
let az_a_tag = AvailabilityZone("az-a".to_string());
|
||||
let az_b_tag = AvailabilityZone("az-b".to_string());
|
||||
let azs = [az_a_tag, az_b_tag];
|
||||
let nodes = make_test_nodes(4, &azs);
|
||||
let mut shards_per_az: HashMap<AvailabilityZone, u32> = HashMap::new();
|
||||
|
||||
let mut scheduler = Scheduler::new([].iter());
|
||||
for node in nodes.values() {
|
||||
scheduler.node_upsert(node);
|
||||
}
|
||||
|
||||
let mut shards = Vec::default();
|
||||
let mut contexts = Vec::default();
|
||||
let mut az_picker = azs.iter().cycle().cloned();
|
||||
for i in 0..100 {
|
||||
let az = az_picker.next().unwrap();
|
||||
let shard_count = i % 4 + 1;
|
||||
*shards_per_az.entry(az.clone()).or_default() += shard_count;
|
||||
|
||||
let tenant_shards = make_test_tenant(
|
||||
PlacementPolicy::Attached(1),
|
||||
ShardCount::new(shard_count.try_into().unwrap()),
|
||||
Some(az),
|
||||
);
|
||||
let context = Rc::new(RefCell::new(ScheduleContext::default()));
|
||||
|
||||
contexts.push(context.clone());
|
||||
let with_ctx = tenant_shards
|
||||
.into_iter()
|
||||
.map(|shard| (shard, context.clone()));
|
||||
for shard_with_ctx in with_ctx {
|
||||
shards.push(shard_with_ctx);
|
||||
}
|
||||
}
|
||||
|
||||
shards.shuffle(&mut rng);
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
struct NodeStats {
|
||||
attachments: u32,
|
||||
secondaries: u32,
|
||||
}
|
||||
|
||||
let mut node_stats: HashMap<NodeId, NodeStats> = HashMap::default();
|
||||
let mut attachments_in_wrong_az = 0;
|
||||
let mut secondaries_in_wrong_az = 0;
|
||||
|
||||
for (shard, context) in &mut shards {
|
||||
let context = &mut *context.borrow_mut();
|
||||
shard.schedule(&mut scheduler, context).unwrap();
|
||||
|
||||
let attached_node = shard.intent.get_attached().unwrap();
|
||||
let stats = node_stats.entry(attached_node).or_default();
|
||||
stats.attachments += 1;
|
||||
|
||||
let secondary_node = *shard.intent.get_secondary().first().unwrap();
|
||||
let stats = node_stats.entry(secondary_node).or_default();
|
||||
stats.secondaries += 1;
|
||||
|
||||
let attached_node_az = nodes
|
||||
.get(&attached_node)
|
||||
.unwrap()
|
||||
.get_availability_zone_id();
|
||||
let secondary_node_az = nodes
|
||||
.get(&secondary_node)
|
||||
.unwrap()
|
||||
.get_availability_zone_id();
|
||||
let preferred_az = shard.preferred_az().unwrap();
|
||||
|
||||
if attached_node_az != preferred_az {
|
||||
eprintln!(
|
||||
"{} attachment was scheduled in AZ {} but preferred AZ {}",
|
||||
shard.tenant_shard_id, attached_node_az, preferred_az
|
||||
);
|
||||
attachments_in_wrong_az += 1;
|
||||
}
|
||||
|
||||
if secondary_node_az == preferred_az {
|
||||
eprintln!(
|
||||
"{} secondary was scheduled in AZ {} which matches preference",
|
||||
shard.tenant_shard_id, attached_node_az
|
||||
);
|
||||
secondaries_in_wrong_az += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let mut violations = Vec::default();
|
||||
|
||||
if attachments_in_wrong_az > 0 {
|
||||
violations.push(format!(
|
||||
"{} attachments scheduled to the incorrect AZ",
|
||||
attachments_in_wrong_az
|
||||
));
|
||||
}
|
||||
|
||||
if secondaries_in_wrong_az > 0 {
|
||||
violations.push(format!(
|
||||
"{} secondaries scheduled to the incorrect AZ",
|
||||
secondaries_in_wrong_az
|
||||
));
|
||||
}
|
||||
|
||||
eprintln!(
|
||||
"attachments_in_wrong_az={} secondaries_in_wrong_az={}",
|
||||
attachments_in_wrong_az, secondaries_in_wrong_az
|
||||
);
|
||||
|
||||
for (node_id, stats) in &node_stats {
|
||||
let node_az = nodes.get(node_id).unwrap().get_availability_zone_id();
|
||||
let ideal_attachment_load = shards_per_az.get(node_az).unwrap() / 2;
|
||||
let allowed_attachment_load =
|
||||
(ideal_attachment_load - 1)..(ideal_attachment_load + 2);
|
||||
|
||||
if !allowed_attachment_load.contains(&stats.attachments) {
|
||||
violations.push(format!(
|
||||
"Found {} attachments on node {}, but expected {}",
|
||||
stats.attachments, node_id, ideal_attachment_load
|
||||
));
|
||||
}
|
||||
|
||||
eprintln!(
|
||||
"{}: attachments={} secondaries={} ideal_attachment_load={}",
|
||||
node_id, stats.attachments, stats.secondaries, ideal_attachment_load
|
||||
);
|
||||
}
|
||||
|
||||
assert!(violations.is_empty(), "{violations:?}");
|
||||
|
||||
for (mut shard, _ctx) in shards {
|
||||
shard.intent.clear(&mut scheduler);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ license.workspace = true
|
||||
aws-sdk-s3.workspace = true
|
||||
either.workspace = true
|
||||
anyhow.workspace = true
|
||||
git-version.workspace = true
|
||||
hex.workspace = true
|
||||
humantime.workspace = true
|
||||
serde.workspace = true
|
||||
|
||||
@@ -642,9 +642,6 @@ class NeonEnvBuilder:
|
||||
patch_script = ""
|
||||
for ps in self.env.pageservers:
|
||||
patch_script += f"UPDATE nodes SET listen_http_port={ps.service_port.http}, listen_pg_port={ps.service_port.pg} WHERE node_id = '{ps.id}';"
|
||||
# This is a temporary to get the backward compat test happy
|
||||
# since the compat snapshot was generated with an older version of neon local
|
||||
patch_script += f"UPDATE nodes SET availability_zone_id='{ps.az_id}' WHERE node_id = '{ps.id}' AND availability_zone_id IS NULL;"
|
||||
patch_script_path.write_text(patch_script)
|
||||
|
||||
# Update the config with info about tenants and timelines
|
||||
@@ -2553,7 +2550,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
desired_availability: Optional[PageserverAvailability],
|
||||
desired_scheduling_policy: Optional[PageserverSchedulingPolicy],
|
||||
max_attempts: int,
|
||||
backoff: int,
|
||||
backoff: float,
|
||||
):
|
||||
"""
|
||||
Poll the node status until it reaches 'desired_scheduling_policy' and 'desired_availability'
|
||||
@@ -2948,7 +2945,7 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
self.id
|
||||
):
|
||||
self.env.storage_controller.poll_node_status(
|
||||
self.id, PageserverAvailability.ACTIVE, None, max_attempts=20, backoff=1
|
||||
self.id, PageserverAvailability.ACTIVE, None, max_attempts=200, backoff=0.1
|
||||
)
|
||||
|
||||
return self
|
||||
@@ -4617,7 +4614,8 @@ class StorageScrubber:
|
||||
"REGION": s3_storage.bucket_region,
|
||||
"BUCKET": s3_storage.bucket_name,
|
||||
"BUCKET_PREFIX": s3_storage.prefix_in_bucket,
|
||||
"RUST_LOG": "DEBUG",
|
||||
"RUST_LOG": "INFO",
|
||||
"PAGESERVER_DISABLE_FILE_LOGGING": "1",
|
||||
}
|
||||
env.update(s3_storage.access_env_vars())
|
||||
|
||||
@@ -4637,10 +4635,8 @@ class StorageScrubber:
|
||||
(output_path, stdout, status_code) = subprocess_capture(
|
||||
self.log_dir,
|
||||
args,
|
||||
echo_stderr=True,
|
||||
echo_stdout=True,
|
||||
env=env,
|
||||
check=False,
|
||||
check=True,
|
||||
capture_stdout=True,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
@@ -198,9 +198,6 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
|
||||
|
||||
def run_pgbench(connstr: str, pg_bin: PgBin):
|
||||
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||
# s10 is about 150MB of data. In debug mode init takes about 15s on SSD.
|
||||
pg_bin.run_capture(["pgbench", "-i", "-I", "dtGvp", "-s10", connstr])
|
||||
log.info("pgbench init done")
|
||||
pg_bin.run_capture(["pgbench", "-T60", connstr])
|
||||
|
||||
|
||||
@@ -247,9 +244,15 @@ def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
log.info(
|
||||
f"primary connstr is {primary.connstr()}, secondary connstr {secondary.connstr()}"
|
||||
)
|
||||
|
||||
# s10 is about 150MB of data. In debug mode init takes about 15s on SSD.
|
||||
pg_bin.run_capture(["pgbench", "-i", "-I", "dtGvp", "-s10", primary.connstr()])
|
||||
log.info("pgbench init done in primary")
|
||||
|
||||
t = threading.Thread(target=run_pgbench, args=(primary.connstr(), pg_bin))
|
||||
t.start()
|
||||
# Wait until pgbench_accounts is created + filled on replica *and*
|
||||
|
||||
# Wait until we see that the pgbench_accounts is created + filled on replica *and*
|
||||
# index is created. Otherwise index creation would conflict with
|
||||
# read queries and hs feedback won't save us.
|
||||
wait_until(60, 1.0, partial(pgbench_accounts_initialized, secondary))
|
||||
|
||||
@@ -10,11 +10,11 @@ from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, PgBin
|
||||
|
||||
|
||||
#
|
||||
# Test branching, when a transaction is in prepared state
|
||||
#
|
||||
@pytest.mark.timeout(600)
|
||||
def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
"""
|
||||
Test resizing the Local File Cache
|
||||
"""
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
@@ -32,27 +32,48 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
|
||||
pg_bin.run_capture(["pgbench", "-c10", f"-T{n_resize}", "-Mprepared", "-S", connstr])
|
||||
|
||||
thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True)
|
||||
# Initializing the pgbench database can be very slow, especially on debug builds.
|
||||
connstr = endpoint.connstr(options="-cstatement_timeout=300s")
|
||||
|
||||
thread = threading.Thread(target=run_pgbench, args=(connstr,), daemon=True)
|
||||
thread.start()
|
||||
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
for _ in range(n_resize):
|
||||
# For as long as pgbench is running, twiddle the LFC size once a second.
|
||||
# Note that we launch this immediately, already while the "pgbench -i"
|
||||
# initialization step is still running. That's quite a different workload
|
||||
# than the actual pgbench benchamark run, so this gives us coverage of both.
|
||||
while thread.is_alive():
|
||||
size = random.randint(1, 512)
|
||||
cur.execute(f"alter system set neon.file_cache_size_limit='{size}MB'")
|
||||
cur.execute("select pg_reload_conf()")
|
||||
time.sleep(1)
|
||||
|
||||
cur.execute("alter system set neon.file_cache_size_limit='100MB'")
|
||||
cur.execute("select pg_reload_conf()")
|
||||
|
||||
thread.join()
|
||||
|
||||
lfc_file_path = f"{endpoint.pg_data_dir_path()}/file.cache"
|
||||
lfc_file_size = os.path.getsize(lfc_file_path)
|
||||
res = subprocess.run(["ls", "-sk", lfc_file_path], check=True, text=True, capture_output=True)
|
||||
lfc_file_blocks = re.findall("([0-9A-F]+)", res.stdout)[0]
|
||||
log.info(f"Size of LFC file {lfc_file_size}, blocks {lfc_file_blocks}")
|
||||
assert lfc_file_size <= 512 * 1024 * 1024
|
||||
# At the end, set it at 100 MB, and perform a final check that the disk usage
|
||||
# of the file is in that ballbark.
|
||||
#
|
||||
# We retry the check a few times, because it might take a while for the
|
||||
# system to react to changing the setting and shrinking the file.
|
||||
cur.execute("alter system set neon.file_cache_size_limit='100MB'")
|
||||
cur.execute("select pg_reload_conf()")
|
||||
nretries = 10
|
||||
while True:
|
||||
lfc_file_path = f"{endpoint.pg_data_dir_path()}/file.cache"
|
||||
lfc_file_size = os.path.getsize(lfc_file_path)
|
||||
res = subprocess.run(
|
||||
["ls", "-sk", lfc_file_path], check=True, text=True, capture_output=True
|
||||
)
|
||||
lfc_file_blocks = re.findall("([0-9A-F]+)", res.stdout)[0]
|
||||
log.info(f"Size of LFC file {lfc_file_size}, blocks {lfc_file_blocks}")
|
||||
assert lfc_file_size <= 512 * 1024 * 1024
|
||||
|
||||
if int(lfc_file_blocks) <= 128 * 1024 or nretries == 0:
|
||||
break
|
||||
|
||||
nretries = nretries - 1
|
||||
time.sleep(1)
|
||||
|
||||
assert int(lfc_file_blocks) <= 128 * 1024
|
||||
|
||||
@@ -122,6 +122,7 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
|
||||
Test static endpoint is protected from GC by acquiring and renewing lsn leases.
|
||||
"""
|
||||
|
||||
LSN_LEASE_LENGTH = 8
|
||||
neon_env_builder.num_pageservers = 2
|
||||
# GC is manual triggered.
|
||||
env = neon_env_builder.init_start(
|
||||
@@ -139,7 +140,7 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
|
||||
"image_creation_threshold": "1",
|
||||
"image_layer_creation_check_threshold": "0",
|
||||
# Short lease length to fit test.
|
||||
"lsn_lease_length": "3s",
|
||||
"lsn_lease_length": f"{LSN_LEASE_LENGTH}s",
|
||||
},
|
||||
initial_tenant_shard_count=2,
|
||||
)
|
||||
@@ -170,10 +171,14 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
|
||||
with env.endpoints.create_start("main") as ep_main:
|
||||
with ep_main.cursor() as cur:
|
||||
cur.execute("CREATE TABLE t0(v0 int primary key, v1 text)")
|
||||
lsn = None
|
||||
lsn = Lsn(0)
|
||||
for i in range(2):
|
||||
lsn = generate_updates_on_main(env, ep_main, i)
|
||||
|
||||
# Round down to the closest LSN on page boundary (unnormalized).
|
||||
XLOG_BLCKSZ = 8192
|
||||
lsn = Lsn((int(lsn) // XLOG_BLCKSZ) * XLOG_BLCKSZ)
|
||||
|
||||
with env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
endpoint_id="static",
|
||||
@@ -183,7 +188,8 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
|
||||
cur.execute("SELECT count(*) FROM t0")
|
||||
assert cur.fetchone() == (ROW_COUNT,)
|
||||
|
||||
time.sleep(3)
|
||||
# Wait for static compute to renew lease at least once.
|
||||
time.sleep(LSN_LEASE_LENGTH / 2)
|
||||
|
||||
generate_updates_on_main(env, ep_main, i, end=100)
|
||||
|
||||
@@ -204,8 +210,9 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
|
||||
# Do some update so we can increment latest_gc_cutoff
|
||||
generate_updates_on_main(env, ep_main, i, end=100)
|
||||
|
||||
# Wait for the existing lease to expire.
|
||||
time.sleep(LSN_LEASE_LENGTH)
|
||||
# Now trigger GC again, layers should be removed.
|
||||
time.sleep(4)
|
||||
for shard, ps in tenant_get_shards(env, env.initial_tenant):
|
||||
client = ps.http_client()
|
||||
gc_result = client.timeline_gc(shard, env.initial_timeline, 0)
|
||||
|
||||
@@ -4,6 +4,7 @@ import threading
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List, Optional, Set, Tuple, Union
|
||||
|
||||
import pytest
|
||||
@@ -2466,6 +2467,87 @@ def test_storage_controller_validate_during_migration(neon_env_builder: NeonEnvB
|
||||
raise
|
||||
|
||||
|
||||
class MigrationFailpoints(Enum):
|
||||
# While only the origin is attached
|
||||
PRE_GENERATION_INC = "reconciler-live-migrate-pre-generation-inc"
|
||||
# While both locations are attached
|
||||
POST_NOTIFY = "reconciler-live-migrate-post-notify"
|
||||
# While only the destination is attached
|
||||
POST_DETACH = "reconciler-live-migrate-post-detach"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"migration_failpoint",
|
||||
[
|
||||
MigrationFailpoints.PRE_GENERATION_INC,
|
||||
MigrationFailpoints.POST_NOTIFY,
|
||||
MigrationFailpoints.POST_DETACH,
|
||||
],
|
||||
)
|
||||
def test_storage_controller_proxy_during_migration(
|
||||
neon_env_builder: NeonEnvBuilder, migration_failpoint: MigrationFailpoints
|
||||
):
|
||||
"""
|
||||
If we send a proxied GET request to the controller during a migration, it should route
|
||||
the request to whichever pageserver was most recently issued a generation.
|
||||
|
||||
Reproducer for https://github.com/neondatabase/neon/issues/9062
|
||||
"""
|
||||
neon_env_builder.num_pageservers = 2
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
env.neon_cli.create_tenant(tenant_id, timeline_id)
|
||||
|
||||
# Activate a failpoint that will cause live migration to get stuck _after_ the generation has been issued
|
||||
# to the new pageserver: this should result in requests routed to the new pageserver.
|
||||
env.storage_controller.configure_failpoints((migration_failpoint.value, "pause"))
|
||||
|
||||
origin_pageserver = env.get_tenant_pageserver(tenant_id)
|
||||
dest_ps_id = [p.id for p in env.pageservers if p.id != origin_pageserver.id][0]
|
||||
|
||||
try:
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
|
||||
migrate_fut = executor.submit(
|
||||
env.storage_controller.tenant_shard_migrate,
|
||||
TenantShardId(tenant_id, 0, 0),
|
||||
dest_ps_id,
|
||||
)
|
||||
|
||||
def has_hit_migration_failpoint():
|
||||
expr = f"at failpoint {str(migration_failpoint.value)}"
|
||||
log.info(expr)
|
||||
assert env.storage_controller.log_contains(expr)
|
||||
|
||||
wait_until(10, 1, has_hit_migration_failpoint)
|
||||
|
||||
# This request should be routed to whichever pageserver holds the highest generation
|
||||
tenant_info = env.storage_controller.pageserver_api().tenant_status(
|
||||
tenant_id,
|
||||
)
|
||||
|
||||
if migration_failpoint in (
|
||||
MigrationFailpoints.POST_NOTIFY,
|
||||
MigrationFailpoints.POST_DETACH,
|
||||
):
|
||||
# We expect request to land on the destination
|
||||
assert tenant_info["generation"] == 2
|
||||
elif migration_failpoint == MigrationFailpoints.PRE_GENERATION_INC:
|
||||
# We expect request to land on the origin
|
||||
assert tenant_info["generation"] == 1
|
||||
|
||||
# Eventually migration completes
|
||||
env.storage_controller.configure_failpoints((migration_failpoint.value, "off"))
|
||||
migrate_fut.result()
|
||||
except:
|
||||
# Always disable 'pause' failpoints, even on failure, to avoid hanging in shutdown
|
||||
env.storage_controller.configure_failpoints((migration_failpoint.value, "off"))
|
||||
raise
|
||||
|
||||
|
||||
@run_only_on_default_postgres("this is like a 'unit test' against storcon db")
|
||||
def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_configs()
|
||||
|
||||
Reference in New Issue
Block a user