mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-23 04:20:37 +00:00
Compare commits
16 Commits
bayandin/t
...
jcsp/foo-b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f12e438c09 | ||
|
|
7bae78186b | ||
|
|
7e560dd00e | ||
|
|
684e924211 | ||
|
|
8ace9ea25f | ||
|
|
6a4f49b08b | ||
|
|
c6e89445e2 | ||
|
|
04f32b9526 | ||
|
|
6f2333f52b | ||
|
|
d447f49bc3 | ||
|
|
c5972389aa | ||
|
|
c4f5736d5a | ||
|
|
518f598e2d | ||
|
|
4b711caf5e | ||
|
|
2cf47b1477 | ||
|
|
7dcfcccf7c |
10
.github/workflows/_build-and-test-locally.yml
vendored
10
.github/workflows/_build-and-test-locally.yml
vendored
@@ -257,7 +257,15 @@ jobs:
|
||||
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(remote_storage)' -E 'test(test_real_azure)'
|
||||
|
||||
- name: Install postgres binaries
|
||||
run: cp -a pg_install /tmp/neon/pg_install
|
||||
run: |
|
||||
# Use tar to copy files matching the pattern, preserving the paths in the destionation
|
||||
tar c \
|
||||
pg_install/v* \
|
||||
pg_install/build/*/src/test/regress/*.so \
|
||||
pg_install/build/*/src/test/regress/pg_regress \
|
||||
pg_install/build/*/src/test/isolation/isolationtester \
|
||||
pg_install/build/*/src/test/isolation/pg_isolation_regress \
|
||||
| tar x -C /tmp/neon
|
||||
|
||||
- name: Upload Neon artifact
|
||||
uses: ./.github/actions/upload
|
||||
|
||||
@@ -36,7 +36,7 @@ jobs:
|
||||
|
||||
strategy:
|
||||
matrix:
|
||||
arch: [ x64 ]
|
||||
arch: [ x64, arm64 ]
|
||||
|
||||
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }}
|
||||
|
||||
@@ -79,9 +79,7 @@ jobs:
|
||||
push: true
|
||||
pull: true
|
||||
file: Dockerfile.build-tools
|
||||
cache-from: |
|
||||
type=registry,ref=cache.neon.build/build-tools:cache-${{ matrix.arch }}
|
||||
type=registry,ref=cache.neon.build/build-tools:cache-${{ matrix.arch }}-does-not-exist-2
|
||||
cache-from: type=registry,ref=cache.neon.build/build-tools:cache-${{ matrix.arch }}
|
||||
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/build-tools:cache-{0},mode=max', matrix.arch) || '' }}
|
||||
tags: neondatabase/build-tools:${{ inputs.image-tag }}-${{ matrix.arch }}
|
||||
|
||||
|
||||
58
.github/workflows/build_and_test.yml
vendored
58
.github/workflows/build_and_test.yml
vendored
@@ -6,7 +6,6 @@ on:
|
||||
- main
|
||||
- release
|
||||
- release-proxy
|
||||
- bayandin/test
|
||||
pull_request:
|
||||
|
||||
defaults:
|
||||
@@ -28,7 +27,7 @@ env:
|
||||
|
||||
jobs:
|
||||
check-permissions:
|
||||
if: false
|
||||
if: ${{ !contains(github.event.pull_request.labels.*.name, 'run-no-ci') }}
|
||||
uses: ./.github/workflows/check-permissions.yml
|
||||
with:
|
||||
github-event-name: ${{ github.event_name }}
|
||||
@@ -79,6 +78,7 @@ jobs:
|
||||
id: build-tag
|
||||
|
||||
check-build-tools-image:
|
||||
needs: [ check-permissions ]
|
||||
uses: ./.github/workflows/check-build-tools-image.yml
|
||||
|
||||
build-build-tools-image:
|
||||
@@ -602,7 +602,20 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
version: [ v14, v15, v16, v17 ]
|
||||
version:
|
||||
# Much data was already generated on old PG versions with bullseye's
|
||||
# libraries, the locales of which can cause data incompatibilities.
|
||||
# However, new PG versions should check if they can be built on newer
|
||||
# images, as that reduces the support burden of old and ancient
|
||||
# distros.
|
||||
- pg: v14
|
||||
debian: bullseye-slim
|
||||
- pg: v15
|
||||
debian: bullseye-slim
|
||||
- pg: v16
|
||||
debian: bullseye-slim
|
||||
- pg: v17
|
||||
debian: bookworm-slim
|
||||
arch: [ x64, arm64 ]
|
||||
|
||||
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }}
|
||||
@@ -645,41 +658,46 @@ jobs:
|
||||
context: .
|
||||
build-args: |
|
||||
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
|
||||
PG_VERSION=${{ matrix.version }}
|
||||
PG_VERSION=${{ matrix.version.pg }}
|
||||
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
|
||||
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}
|
||||
DEBIAN_FLAVOR=${{ matrix.version.debian }}
|
||||
provenance: false
|
||||
push: true
|
||||
pull: true
|
||||
file: compute/Dockerfile.compute-node
|
||||
cache-from: type=registry,ref=cache.neon.build/compute-node-${{ matrix.version }}:cache-${{ matrix.arch }}
|
||||
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/compute-node-{0}:cache-{1},mode=max', matrix.version, matrix.arch) || '' }}
|
||||
cache-from: type=registry,ref=cache.neon.build/compute-node-${{ matrix.version.pg }}:cache-${{ matrix.arch }}
|
||||
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/compute-node-{0}:cache-{1},mode=max', matrix.version.pg, matrix.arch) || '' }}
|
||||
tags: |
|
||||
neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }}
|
||||
neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }}
|
||||
|
||||
- name: Build neon extensions test image
|
||||
if: matrix.version == 'v16'
|
||||
if: matrix.version.pg == 'v16'
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
context: .
|
||||
build-args: |
|
||||
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
|
||||
PG_VERSION=${{ matrix.version }}
|
||||
PG_VERSION=${{ matrix.version.pg }}
|
||||
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
|
||||
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}
|
||||
DEBIAN_FLAVOR=${{ matrix.version.debian }}
|
||||
provenance: false
|
||||
push: true
|
||||
pull: true
|
||||
file: compute/Dockerfile.compute-node
|
||||
target: neon-pg-ext-test
|
||||
cache-from: type=registry,ref=cache.neon.build/neon-test-extensions-${{ matrix.version }}:cache-${{ matrix.arch }}
|
||||
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/neon-test-extensions-{0}:cache-{1},mode=max', matrix.version, matrix.arch) || '' }}
|
||||
cache-from: type=registry,ref=cache.neon.build/neon-test-extensions-${{ matrix.version.pg }}:cache-${{ matrix.arch }}
|
||||
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/neon-test-extensions-{0}:cache-{1},mode=max', matrix.version.pg, matrix.arch) || '' }}
|
||||
tags: |
|
||||
neondatabase/neon-test-extensions-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}-${{ matrix.arch }}
|
||||
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{needs.tag.outputs.build-tag}}-${{ matrix.arch }}
|
||||
|
||||
- name: Build compute-tools image
|
||||
# compute-tools are Postgres independent, so build it only once
|
||||
if: matrix.version == 'v17'
|
||||
# We pick 16, because that builds on debian 11 with older glibc (and is
|
||||
# thus compatible with newer glibc), rather than 17 on Debian 12, as
|
||||
# that isn't guaranteed to be compatible with Debian 11
|
||||
if: matrix.version.pg == 'v16'
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
target: compute-tools-image
|
||||
@@ -688,6 +706,7 @@ jobs:
|
||||
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
|
||||
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
|
||||
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}
|
||||
DEBIAN_FLAVOR=${{ matrix.version.debian }}
|
||||
provenance: false
|
||||
push: true
|
||||
pull: true
|
||||
@@ -843,6 +862,9 @@ jobs:
|
||||
needs: [ check-permissions, tag, test-images, vm-compute-node-image ]
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
permissions:
|
||||
id-token: write # for `aws-actions/configure-aws-credentials`
|
||||
|
||||
env:
|
||||
VERSIONS: v14 v15 v16 v17
|
||||
|
||||
@@ -887,13 +909,19 @@ jobs:
|
||||
docker buildx imagetools create -t neondatabase/neon-test-extensions-v16:latest \
|
||||
neondatabase/neon-test-extensions-v16:${{ needs.tag.outputs.build-tag }}
|
||||
|
||||
- name: Configure AWS-prod credentials
|
||||
if: github.ref_name == 'release'|| github.ref_name == 'release-proxy'
|
||||
uses: aws-actions/configure-aws-credentials@v4
|
||||
with:
|
||||
aws-region: eu-central-1
|
||||
mask-aws-account-id: true
|
||||
role-to-assume: ${{ secrets.PROD_GHA_OIDC_ROLE }}
|
||||
|
||||
- name: Login to prod ECR
|
||||
uses: docker/login-action@v3
|
||||
if: github.ref_name == 'release'|| github.ref_name == 'release-proxy'
|
||||
with:
|
||||
registry: 093970136003.dkr.ecr.eu-central-1.amazonaws.com
|
||||
username: ${{ secrets.PROD_GHA_RUNNER_LIMITED_AWS_ACCESS_KEY_ID }}
|
||||
password: ${{ secrets.PROD_GHA_RUNNER_LIMITED_AWS_SECRET_ACCESS_KEY }}
|
||||
|
||||
- name: Copy all images to prod ECR
|
||||
if: github.ref_name == 'release'|| github.ref_name == 'release-proxy'
|
||||
|
||||
@@ -33,7 +33,7 @@ jobs:
|
||||
IMAGE_TAG: |
|
||||
${{ hashFiles('Dockerfile.build-tools',
|
||||
'.github/workflows/check-build-tools-image.yml',
|
||||
'.github/workflows/build-build-tools-image.yml') }}-test
|
||||
'.github/workflows/build-build-tools-image.yml') }}
|
||||
run: |
|
||||
echo "image-tag=${IMAGE_TAG}" | tee -a $GITHUB_OUTPUT
|
||||
|
||||
|
||||
2
.github/workflows/trigger-e2e-tests.yml
vendored
2
.github/workflows/trigger-e2e-tests.yml
vendored
@@ -102,7 +102,7 @@ jobs:
|
||||
# Default set of platforms to run e2e tests on
|
||||
platforms='["docker", "k8s"]'
|
||||
|
||||
# If the PR changes vendor/, pgxn/ or libs/vm_monitor/ directories, or Dockerfile.compute-node, add k8s-neonvm to the list of platforms.
|
||||
# If the PR changes vendor/, pgxn/ or libs/vm_monitor/ directories, or compute/Dockerfile.compute-node, add k8s-neonvm to the list of platforms.
|
||||
# If the workflow run is not a pull request, add k8s-neonvm to the list.
|
||||
if [ "$GITHUB_EVENT_NAME" == "pull_request" ]; then
|
||||
for f in $(gh api "/repos/${GITHUB_REPOSITORY}/pulls/${PR_NUMBER}/files" --paginate --jq '.[].filename'); do
|
||||
|
||||
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -1321,7 +1321,6 @@ dependencies = [
|
||||
"clap",
|
||||
"comfy-table",
|
||||
"compute_api",
|
||||
"git-version",
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
"hyper 0.14.30",
|
||||
@@ -3578,7 +3577,6 @@ dependencies = [
|
||||
"anyhow",
|
||||
"camino",
|
||||
"clap",
|
||||
"git-version",
|
||||
"humantime",
|
||||
"pageserver",
|
||||
"pageserver_api",
|
||||
@@ -3617,7 +3615,6 @@ dependencies = [
|
||||
"enumset",
|
||||
"fail",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hex",
|
||||
"hex-literal",
|
||||
"humantime",
|
||||
@@ -3737,7 +3734,6 @@ dependencies = [
|
||||
"clap",
|
||||
"criterion",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hex-literal",
|
||||
"itertools 0.10.5",
|
||||
"once_cell",
|
||||
@@ -4307,7 +4303,6 @@ dependencies = [
|
||||
"fallible-iterator",
|
||||
"framed-websockets",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hashbrown 0.14.5",
|
||||
"hashlink",
|
||||
"hex",
|
||||
@@ -5139,7 +5134,6 @@ dependencies = [
|
||||
"desim",
|
||||
"fail",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hex",
|
||||
"humantime",
|
||||
"hyper 0.14.30",
|
||||
@@ -5702,7 +5696,6 @@ dependencies = [
|
||||
"futures",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"git-version",
|
||||
"humantime",
|
||||
"hyper 0.14.30",
|
||||
"metrics",
|
||||
@@ -5730,7 +5723,6 @@ dependencies = [
|
||||
"diesel_migrations",
|
||||
"fail",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hex",
|
||||
"humantime",
|
||||
"hyper 0.14.30",
|
||||
@@ -5783,7 +5775,6 @@ dependencies = [
|
||||
"either",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"git-version",
|
||||
"hex",
|
||||
"humantime",
|
||||
"itertools 0.10.5",
|
||||
@@ -6715,6 +6706,7 @@ dependencies = [
|
||||
"criterion",
|
||||
"fail",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hex",
|
||||
"hex-literal",
|
||||
"humantime",
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
[](https://neon.tech)
|
||||
|
||||
|
||||
foo
|
||||
|
||||
|
||||
# Neon
|
||||
|
||||
|
||||
@@ -3,13 +3,15 @@ ARG REPOSITORY=neondatabase
|
||||
ARG IMAGE=build-tools
|
||||
ARG TAG=pinned
|
||||
ARG BUILD_TAG
|
||||
ARG DEBIAN_FLAVOR=bullseye-slim
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "build-deps"
|
||||
#
|
||||
#########################################################################################
|
||||
FROM debian:bullseye-slim AS build-deps
|
||||
FROM debian:$DEBIAN_FLAVOR AS build-deps
|
||||
ARG DEBIAN_FLAVOR
|
||||
RUN apt update && \
|
||||
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev \
|
||||
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libssl-dev \
|
||||
@@ -1027,7 +1029,8 @@ RUN cd compute_tools && mold -run cargo build --locked --profile release-line-de
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM debian:bullseye-slim AS compute-tools-image
|
||||
FROM debian:$DEBIAN_FLAVOR AS compute-tools-image
|
||||
ARG DEBIAN_FLAVOR
|
||||
|
||||
COPY --from=compute-tools /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl
|
||||
|
||||
@@ -1037,7 +1040,8 @@ COPY --from=compute-tools /home/nonroot/target/release-line-debug-size-lto/compu
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM debian:bullseye-slim AS pgbouncer
|
||||
FROM debian:$DEBIAN_FLAVOR AS pgbouncer
|
||||
ARG DEBIAN_FLAVOR
|
||||
RUN set -e \
|
||||
&& apt-get update \
|
||||
&& apt-get install -y \
|
||||
@@ -1179,7 +1183,9 @@ ENV PGDATABASE=postgres
|
||||
# Put it all together into the final image
|
||||
#
|
||||
#########################################################################################
|
||||
FROM debian:bullseye-slim
|
||||
FROM debian:$DEBIAN_FLAVOR
|
||||
ARG DEBIAN_FLAVOR
|
||||
ENV DEBIAN_FLAVOR=$DEBIAN_FLAVOR
|
||||
# Add user postgres
|
||||
RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
|
||||
echo "postgres:test_console_pass" | chpasswd && \
|
||||
@@ -1211,21 +1217,34 @@ COPY --chmod=0644 compute/etc/neon_collector_autoscaling.yml /etc/neon_collector
|
||||
# Create remote extension download directory
|
||||
RUN mkdir /usr/local/download_extensions && chown -R postgres:postgres /usr/local/download_extensions
|
||||
|
||||
|
||||
# Install:
|
||||
# libreadline8 for psql
|
||||
# libicu67, locales for collations (including ICU and plpgsql_check)
|
||||
# liblz4-1 for lz4
|
||||
# libossp-uuid16 for extension ossp-uuid
|
||||
# libgeos, libgdal, libsfcgal1, libproj and libprotobuf-c1 for PostGIS
|
||||
# libgeos, libsfcgal1, and libprotobuf-c1 for PostGIS
|
||||
# libxml2, libxslt1.1 for xml2
|
||||
# libzstd1 for zstd
|
||||
# libboost* for rdkit
|
||||
# ca-certificates for communicating with s3 by compute_ctl
|
||||
RUN apt update && \
|
||||
|
||||
|
||||
RUN apt update && \
|
||||
case $DEBIAN_FLAVOR in \
|
||||
# Version-specific installs for Bullseye (PG14-PG16):
|
||||
# libicu67, locales for collations (including ICU and plpgsql_check)
|
||||
# libgdal28, libproj19 for PostGIS
|
||||
bullseye*) \
|
||||
VERSION_INSTALLS="libicu67 libgdal28 libproj19"; \
|
||||
;; \
|
||||
# Version-specific installs for Bookworm (PG17):
|
||||
# libicu72, locales for collations (including ICU and plpgsql_check)
|
||||
# libgdal32, libproj25 for PostGIS
|
||||
bookworm*) \
|
||||
VERSION_INSTALLS="libicu72 libgdal32 libproj25"; \
|
||||
;; \
|
||||
esac && \
|
||||
apt install --no-install-recommends -y \
|
||||
gdb \
|
||||
libicu67 \
|
||||
liblz4-1 \
|
||||
libreadline8 \
|
||||
libboost-iostreams1.74.0 \
|
||||
@@ -1234,8 +1253,6 @@ RUN apt update && \
|
||||
libboost-system1.74.0 \
|
||||
libossp-uuid16 \
|
||||
libgeos-c1v5 \
|
||||
libgdal28 \
|
||||
libproj19 \
|
||||
libprotobuf-c1 \
|
||||
libsfcgal1 \
|
||||
libxml2 \
|
||||
@@ -1244,7 +1261,8 @@ RUN apt update && \
|
||||
libcurl4-openssl-dev \
|
||||
locales \
|
||||
procps \
|
||||
ca-certificates && \
|
||||
ca-certificates \
|
||||
$VERSION_INSTALLS && \
|
||||
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
|
||||
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
|
||||
|
||||
|
||||
@@ -195,7 +195,7 @@ metrics:
|
||||
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp. These
|
||||
-- temporary snapshot files are renamed to the actual snapshot files after they are
|
||||
-- completely built. We only WAL-log the completely built snapshot files.
|
||||
(SELECT COUNT(*) FROM pg_ls_logicalsnapdir() WHERE name LIKE '%.snap') AS num_logical_snapshot_files;
|
||||
(SELECT COUNT(*) FROM pg_ls_dir('pg_logical/snapshots') AS name WHERE name LIKE '%.snap') AS num_logical_snapshot_files;
|
||||
|
||||
# In all the below metrics, we cast LSNs to floats because Prometheus only supports floats.
|
||||
# It's probably fine because float64 can store integers from -2^53 to +2^53 exactly.
|
||||
@@ -244,4 +244,3 @@ metrics:
|
||||
SELECT slot_name,
|
||||
CASE WHEN wal_status = 'lost' THEN 1 ELSE 0 END AS wal_is_lost
|
||||
FROM pg_replication_slots;
|
||||
|
||||
|
||||
@@ -9,7 +9,6 @@ anyhow.workspace = true
|
||||
camino.workspace = true
|
||||
clap.workspace = true
|
||||
comfy-table.workspace = true
|
||||
git-version.workspace = true
|
||||
humantime.workspace = true
|
||||
nix.workspace = true
|
||||
once_cell.workspace = true
|
||||
|
||||
@@ -4,8 +4,8 @@ use std::{str::FromStr, time::Duration};
|
||||
use clap::{Parser, Subcommand};
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse, ShardSchedulingPolicy,
|
||||
TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
|
||||
AvailabilityZone, NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse,
|
||||
ShardSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
|
||||
},
|
||||
models::{
|
||||
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
|
||||
@@ -339,7 +339,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
listen_pg_port,
|
||||
listen_http_addr,
|
||||
listen_http_port,
|
||||
availability_zone_id,
|
||||
availability_zone_id: AvailabilityZone(availability_zone_id),
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
|
||||
343
docs/rfcs/038-independent-compute-release.md
Normal file
343
docs/rfcs/038-independent-compute-release.md
Normal file
@@ -0,0 +1,343 @@
|
||||
# Independent compute release
|
||||
|
||||
Created at: 2024-08-30. Author: Alexey Kondratov (@ololobus)
|
||||
|
||||
## Summary
|
||||
|
||||
This document proposes an approach to fully independent compute release flow. It attempts to
|
||||
cover the following features:
|
||||
|
||||
- Process is automated as much as possible to minimize human errors.
|
||||
- Compute<->storage protocol compatibility is ensured.
|
||||
- A transparent release history is available with an easy rollback strategy.
|
||||
- Although not in the scope of this document, there is a viable way to extend the proposed release
|
||||
flow to achieve the canary and/or blue-green deployment strategies.
|
||||
|
||||
## Motivation
|
||||
|
||||
Previously, the compute release was tightly coupled to the storage release. This meant that once
|
||||
some storage nodes got restarted with a newer version, all new compute starts using these nodes
|
||||
automatically got a new version. Thus, two releases happen in parallel, which increases the blast
|
||||
radius and makes ownership fuzzy.
|
||||
|
||||
Now, we practice a manual v0 independent compute release flow -- after getting a new compute release
|
||||
image and tag, we pin it region by region using Admin UI. It's better, but it still has its own flaws:
|
||||
|
||||
1. It's a simple but fairly manual process, as you need to click through a few pages.
|
||||
2. It's prone to human errors, e.g., you could mistype or copy the wrong compute tag.
|
||||
3. We now require an additional approval in the Admin UI, which partially solves the 2.,
|
||||
but also makes the whole process pretty annoying, as you constantly need to go back
|
||||
and forth between two people.
|
||||
|
||||
## Non-goals
|
||||
|
||||
It's not the goal of this document to propose a design for some general-purpose release tool like Helm.
|
||||
The document considers how the current compute fleet is orchestrated at Neon. Even if we later
|
||||
decide to split the control plane further (e.g., introduce a separate compute controller), the proposed
|
||||
release process shouldn't change much, i.e., the releases table and API will reside in
|
||||
one of the parts.
|
||||
|
||||
Achieving the canary and/or blue-green deploy strategies is out of the scope of this document. They
|
||||
were kept in mind, though, so it's expected that the proposed approach will lay down the foundation
|
||||
for implementing them in future iterations.
|
||||
|
||||
## Impacted components
|
||||
|
||||
Compute, control plane, CI, observability (some Grafana dashboards may require changes).
|
||||
|
||||
## Prior art
|
||||
|
||||
One of the very close examples is how Helm tracks [releases history](https://helm.sh/docs/helm/helm_history/).
|
||||
|
||||
In the code:
|
||||
|
||||
- [Release](https://github.com/helm/helm/blob/2b30cf4b61d587d3f7594102bb202b787b9918db/pkg/release/release.go#L20-L43)
|
||||
- [Release info](https://github.com/helm/helm/blob/2b30cf4b61d587d3f7594102bb202b787b9918db/pkg/release/info.go#L24-L40)
|
||||
- [Release status](https://github.com/helm/helm/blob/2b30cf4b61d587d3f7594102bb202b787b9918db/pkg/release/status.go#L18-L42)
|
||||
|
||||
TL;DR it has several important attributes:
|
||||
|
||||
- Revision -- unique release ID/primary key. It is not the same as the application version,
|
||||
because the same version can be deployed several times, e.g., after a newer version rollback.
|
||||
- App version -- version of the application chart/code.
|
||||
- Config -- set of overrides to the default config of the application.
|
||||
- Status -- current status of the release in the history.
|
||||
- Timestamps -- tracks when a release was created and deployed.
|
||||
|
||||
## Proposed implementation
|
||||
|
||||
### Separate release branch
|
||||
|
||||
We will use a separate release branch, `release-compute`, to have a clean history for releases and commits.
|
||||
In order to avoid confusion with storage releases, we will use a different prefix for compute [git release
|
||||
tags](https://github.com/neondatabase/neon/releases) -- `release-compute-XXXX`. We will use the same tag for
|
||||
Docker images as well. The `neondatabase/compute-node-v16:release-compute-XXXX` looks longer and a bit redundant,
|
||||
but it's better to have image and git tags in sync.
|
||||
|
||||
Currently, control plane relies on the numeric compute and storage release versions to decide on compute->storage
|
||||
compatibility. Once we implement this proposal, we should drop this code as release numbers will be completely
|
||||
independent. The only constraint we want is that it must monotonically increase within the same release branch.
|
||||
|
||||
### Compute config/settings manifest
|
||||
|
||||
We will create a new sub-directory `compute` and file `compute/manifest.yaml` with a structure:
|
||||
|
||||
```yaml
|
||||
pg_settings:
|
||||
# Common settings for primaries and secondaries of all versions.
|
||||
common:
|
||||
wal_log_hints: "off"
|
||||
max_wal_size: "1024"
|
||||
|
||||
per_version:
|
||||
14:
|
||||
# Common settings for both replica and primary of version PG 14
|
||||
common:
|
||||
shared_preload_libraries: "neon,pg_stat_statements,extension_x"
|
||||
15:
|
||||
common:
|
||||
shared_preload_libraries: "neon,pg_stat_statements,extension_x"
|
||||
# Settings that should be applied only to
|
||||
replica:
|
||||
# Available only starting Postgres 15th
|
||||
recovery_prefetch: "off"
|
||||
# ...
|
||||
17:
|
||||
common:
|
||||
# For example, if third-party `extension_x` is not yet available for PG 17
|
||||
shared_preload_libraries: "neon,pg_stat_statements"
|
||||
replica:
|
||||
recovery_prefetch: "off"
|
||||
```
|
||||
|
||||
**N.B.** Setting value should be a string with `on|off` for booleans and a number (as a string)
|
||||
without units for all numeric settings. That's how the control plane currently operates.
|
||||
|
||||
The priority of settings will be (a higher number is a higher priority):
|
||||
|
||||
1. Any static and hard-coded settings in the control plane
|
||||
2. `pg_settings->common`
|
||||
3. Per-version `common`
|
||||
4. Per-version `replica`
|
||||
5. Any per-user/project/endpoint overrides in the control plane
|
||||
6. Any dynamic setting calculated based on the compute size
|
||||
|
||||
**N.B.** For simplicity, we do not do any custom logic for `shared_preload_libraries`, so it's completely
|
||||
overridden if specified on some level. Make sure that you include all necessary extensions in it when you
|
||||
do any overrides.
|
||||
|
||||
**N.B.** There is a tricky question about what to do with custom compute image pinning we sometimes
|
||||
do for particular projects and customers. That's usually some ad-hoc work and images are based on
|
||||
the latest compute image, so it's relatively safe to assume that we could use settings from the latest compute
|
||||
release. If for some reason that's not true, and further overrides are needed, it's also possible to do
|
||||
on the project level together with pinning the image, so it's on-call/engineer/support responsibility to
|
||||
ensure that compute starts with the specified custom image. The only real risk is that compute image will get
|
||||
stale and settings from new releases will drift away, so eventually it will get something incompatible,
|
||||
but i) this is some operational issue, as we do not want stale images anyway, and ii) base settings
|
||||
receive something really new so rarely that the chance of this happening is very low. If we want to solve it completely,
|
||||
then together with pinning the image we could also pin the matching release revision in the control plane.
|
||||
|
||||
The compute team will own the content of `compute/manifest.yaml`.
|
||||
|
||||
### Control plane: releases table
|
||||
|
||||
In order to store information about releases, the control plane will use a table `compute_releases` with the following
|
||||
schema:
|
||||
|
||||
```sql
|
||||
CREATE TABLE compute_releases (
|
||||
-- Unique release ID
|
||||
-- N.B. Revision won't by synchronized across all regions, because all control planes are technically independent
|
||||
-- services. We have the same situation with Helm releases as well because they could be deployed and rolled back
|
||||
-- independently in different clusters.
|
||||
revision BIGSERIAL PRIMARY KEY,
|
||||
-- Numeric version of the compute image, e.g. 9057
|
||||
version BIGINT NOT NULL,
|
||||
-- Compute image tag, e.g. `release-9057`
|
||||
tag TEXT NOT NULL,
|
||||
-- Current release status. Currently, it will be a simple enum
|
||||
-- * `deployed` -- release is deployed and used for new compute starts.
|
||||
-- Exactly one release can have this status at a time.
|
||||
-- * `superseded` -- release has been replaced by a newer one.
|
||||
-- But we can always extend it in the future when we need more statuses
|
||||
-- for more complex deployment strategies.
|
||||
status TEXT NOT NULL,
|
||||
-- Any additional metadata for compute in the corresponding release
|
||||
manifest JSONB NOT NULL,
|
||||
-- Timestamp when release record was created in the control plane database
|
||||
created_at TIMESTAMP NOT NULL DEFAULT now(),
|
||||
-- Timestamp when release deployment was finished
|
||||
deployed_at TIMESTAMP
|
||||
);
|
||||
```
|
||||
|
||||
We keep track of the old releases not only for the sake of audit, but also because we usually have ~30% of
|
||||
old computes started using the image from one of the previous releases. Yet, when users want to reconfigure
|
||||
them without restarting, the control plane needs to know what settings are applicable to them, so we also need
|
||||
information about the previous releases that are readily available. There could be some other auxiliary info
|
||||
needed as well: supported extensions, compute flags, etc.
|
||||
|
||||
**N.B.** Here, we can end up in an ambiguous situation when the same compute image is deployed twice, e.g.,
|
||||
it was deployed once, then rolled back, and then deployed again, potentially with a different manifest. Yet,
|
||||
we could've started some computes with the first deployment and some with the second. Thus, when we need to
|
||||
look up the manifest for the compute by its image tag, we will see two records in the table with the same tag,
|
||||
but different revision numbers. We can assume that this could happen only in case of rollbacks, so we
|
||||
can just take the latest revision for the given tag.
|
||||
|
||||
### Control plane: management API
|
||||
|
||||
The control plane will implement new API methods to manage releases:
|
||||
|
||||
1. `POST /management/api/v2/compute_releases` to create a new release. With payload
|
||||
|
||||
```json
|
||||
{
|
||||
"version": 9057,
|
||||
"tag": "release-9057",
|
||||
"manifest": {}
|
||||
}
|
||||
```
|
||||
|
||||
and response
|
||||
|
||||
```json
|
||||
{
|
||||
"revision": 53,
|
||||
"version": 9057,
|
||||
"tag": "release-9057",
|
||||
"status": "deployed",
|
||||
"manifest": {},
|
||||
"created_at": "2024-08-15T15:52:01.0000Z",
|
||||
"deployed_at": "2024-08-15T15:52:01.0000Z",
|
||||
}
|
||||
```
|
||||
|
||||
Here, we can actually mix-in custom (remote) extensions metadata into the `manifest`, so that the control plane
|
||||
will get information about all available extensions not bundled into compute image. The corresponding
|
||||
workflow in `neondatabase/build-custom-extensions` should produce it as an artifact and make
|
||||
it accessible to the workflow in the `neondatabase/infra`. See the complete release flow below. Doing that,
|
||||
we put a constraint that new custom extension requires new compute release, which is good for the safety,
|
||||
but is not exactly what we want operational-wise (we want to be able to deploy new extensions without new
|
||||
images). Yet, it can be solved incrementally: v0 -- do not do anything with extensions at all;
|
||||
v1 -- put them into the same manifest; v2 -- make them separate entities with their own lifecycle.
|
||||
|
||||
**N.B.** This method is intended to be used in CI workflows, and CI/network can be flaky. It's reasonable
|
||||
to assume that we could retry the request several times, even though it's already succeeded. Although it's
|
||||
not a big deal to create several identical releases one-by-one, it's better to avoid it, so the control plane
|
||||
should check if the latest release is identical and just return `304 Not Modified` in this case.
|
||||
|
||||
2. `POST /management/api/v2/compute_releases/rollback` to rollback to any previously deployed release. With payload
|
||||
including the revision of the release to rollback to:
|
||||
|
||||
```json
|
||||
{
|
||||
"revision": 52
|
||||
}
|
||||
```
|
||||
|
||||
Rollback marks the current release as `superseded` and creates a new release with all the same data as the
|
||||
requested revision, but with a new revision number.
|
||||
|
||||
This rollback API is not strictly needed, as we can just use `infra` repo workflow to deploy any
|
||||
available tag. It's still nice to have for on-call and any urgent matters, for example, if we need
|
||||
to rollback and GitHub is down. It's much easier to specify only the revision number vs. crafting
|
||||
all the necessary data for the new release payload.
|
||||
|
||||
### Compute->storage compatibility tests
|
||||
|
||||
In order to safely release new compute versions independently from storage, we need to ensure that the currently
|
||||
deployed storage is compatible with the new compute version. Currently, we maintain backward compatibility
|
||||
in storage, but newer computes may require a newer storage version.
|
||||
|
||||
Remote end-to-end (e2e) tests [already accept](https://github.com/neondatabase/cloud/blob/e3468d433e0d73d02b7d7e738d027f509b522408/.github/workflows/testing.yml#L43-L48)
|
||||
`storage_image_tag` and `compute_image_tag` as separate inputs. That means that we could reuse e2e tests to ensure
|
||||
compatibility between storage and compute:
|
||||
|
||||
1. Pick the latest storage release tag and use it as `storage_image_tag`.
|
||||
2. Pick a new compute tag built in the current compute release PR and use it as `compute_image_tag`.
|
||||
Here, we should use a temporary ECR image tag, because the final tag will be known only after the release PR is merged.
|
||||
3. Trigger e2e tests as usual.
|
||||
|
||||
### Release flow
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
|
||||
actor oncall as Compute on-call person
|
||||
participant neon as neondatabase/neon
|
||||
|
||||
box private
|
||||
participant cloud as neondatabase/cloud
|
||||
participant exts as neondatabase/build-custom-extensions
|
||||
participant infra as neondatabase/infra
|
||||
end
|
||||
|
||||
box cloud
|
||||
participant preprod as Pre-prod control plane
|
||||
participant prod as Production control plane
|
||||
participant k8s as Compute k8s
|
||||
end
|
||||
|
||||
oncall ->> neon: Open release PR into release-compute
|
||||
|
||||
activate neon
|
||||
neon ->> cloud: CI: trigger e2e compatibility tests
|
||||
activate cloud
|
||||
cloud -->> neon: CI: e2e tests pass
|
||||
deactivate cloud
|
||||
neon ->> neon: CI: pass PR checks, get approvals
|
||||
deactivate neon
|
||||
|
||||
oncall ->> neon: Merge release PR into release-compute
|
||||
|
||||
activate neon
|
||||
neon ->> neon: CI: pass checks, build and push images
|
||||
neon ->> exts: CI: trigger extensions build
|
||||
activate exts
|
||||
exts -->> neon: CI: extensions are ready
|
||||
deactivate exts
|
||||
neon ->> neon: CI: create release tag
|
||||
neon ->> infra: Trigger release workflow using the produced tag
|
||||
deactivate neon
|
||||
|
||||
activate infra
|
||||
infra ->> infra: CI: pass checks
|
||||
infra ->> preprod: Release new compute image to pre-prod automatically <br/> POST /management/api/v2/compute_releases
|
||||
activate preprod
|
||||
preprod -->> infra: 200 OK
|
||||
deactivate preprod
|
||||
|
||||
infra ->> infra: CI: wait for per-region production deploy approvals
|
||||
oncall ->> infra: CI: approve deploys region by region
|
||||
infra ->> k8s: Prewarm new compute image
|
||||
infra ->> prod: POST /management/api/v2/compute_releases
|
||||
activate prod
|
||||
prod -->> infra: 200 OK
|
||||
deactivate prod
|
||||
deactivate infra
|
||||
```
|
||||
|
||||
## Further work
|
||||
|
||||
As briefly mentioned in other sections, eventually, we would like to use more complex deployment strategies.
|
||||
For example, we can pass a fraction of the total compute starts that should use the new release. Then we can
|
||||
mark the release as `partial` or `canary` and monitor its performance. If everything is fine, we can promote it
|
||||
to `deployed` status. If not, we can roll back to the previous one.
|
||||
|
||||
## Alternatives
|
||||
|
||||
In theory, we can try using Helm as-is:
|
||||
|
||||
1. Write a compute Helm chart. That will actually have only some config map, which the control plane can access and read.
|
||||
N.B. We could reuse the control plane chart as well, but then it's not a fully independent release again and even more fuzzy.
|
||||
2. The control plane will read it and start using the new compute version for new starts.
|
||||
|
||||
Drawbacks:
|
||||
|
||||
1. Helm releases work best if the workload is controlled by the Helm chart itself. Then you can have different
|
||||
deployment strategies like rolling update or canary or blue/green deployments. At Neon, the compute starts are controlled
|
||||
by control plane, so it makes it much more tricky.
|
||||
2. Releases visibility will suffer, i.e. instead of a nice table in the control plane and Admin UI, we would need to use
|
||||
`helm` cli and/or K8s UIs like K8sLens.
|
||||
3. We do not restart all computes shortly after the new version release. This means that for some features and compatibility
|
||||
purpose (see above) control plane may need some auxiliary info from the previous releases.
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fmt::Display;
|
||||
use std::str::FromStr;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
@@ -57,7 +58,7 @@ pub struct NodeRegisterRequest {
|
||||
pub listen_http_addr: String,
|
||||
pub listen_http_port: u16,
|
||||
|
||||
pub availability_zone_id: String,
|
||||
pub availability_zone_id: AvailabilityZone,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
@@ -74,10 +75,19 @@ pub struct TenantPolicyRequest {
|
||||
pub scheduling: Option<ShardSchedulingPolicy>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
|
||||
pub struct AvailabilityZone(pub String);
|
||||
|
||||
impl Display for AvailabilityZone {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ShardsPreferredAzsRequest {
|
||||
#[serde(flatten)]
|
||||
pub preferred_az_ids: HashMap<TenantShardId, String>,
|
||||
pub preferred_az_ids: HashMap<TenantShardId, AvailabilityZone>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
|
||||
@@ -19,6 +19,7 @@ bincode.workspace = true
|
||||
bytes.workspace = true
|
||||
camino.workspace = true
|
||||
chrono.workspace = true
|
||||
git-version.workspace = true
|
||||
hex = { workspace = true, features = ["serde"] }
|
||||
humantime.workspace = true
|
||||
hyper = { workspace = true, features = ["full"] }
|
||||
|
||||
@@ -92,6 +92,10 @@ pub mod toml_edit_ext;
|
||||
|
||||
pub mod circuit_breaker;
|
||||
|
||||
// Re-export used in macro. Avoids adding git-version as dep in target crates.
|
||||
#[doc(hidden)]
|
||||
pub use git_version;
|
||||
|
||||
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
|
||||
///
|
||||
/// we have several cases:
|
||||
@@ -131,7 +135,7 @@ macro_rules! project_git_version {
|
||||
($const_identifier:ident) => {
|
||||
// this should try GIT_VERSION first only then git_version::git_version!
|
||||
const $const_identifier: &::core::primitive::str = {
|
||||
const __COMMIT_FROM_GIT: &::core::primitive::str = git_version::git_version! {
|
||||
const __COMMIT_FROM_GIT: &::core::primitive::str = $crate::git_version::git_version! {
|
||||
prefix = "",
|
||||
fallback = "unknown",
|
||||
args = ["--abbrev=40", "--always", "--dirty=-modified"] // always use full sha
|
||||
|
||||
@@ -27,7 +27,6 @@ crc32c.workspace = true
|
||||
either.workspace = true
|
||||
fail.workspace = true
|
||||
futures.workspace = true
|
||||
git-version.workspace = true
|
||||
hex.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
|
||||
@@ -12,7 +12,6 @@ anyhow.workspace = true
|
||||
async-stream.workspace = true
|
||||
clap = { workspace = true, features = ["string"] }
|
||||
futures.workspace = true
|
||||
git-version.workspace = true
|
||||
itertools.workspace = true
|
||||
once_cell.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
|
||||
@@ -10,7 +10,6 @@ license.workspace = true
|
||||
anyhow.workspace = true
|
||||
camino.workspace = true
|
||||
clap = { workspace = true, features = ["string"] }
|
||||
git-version.workspace = true
|
||||
humantime.workspace = true
|
||||
pageserver = { path = ".." }
|
||||
pageserver_api.workspace = true
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::collections::HashMap;
|
||||
|
||||
use futures::Future;
|
||||
use pageserver_api::{
|
||||
controller_api::NodeRegisterRequest,
|
||||
controller_api::{AvailabilityZone, NodeRegisterRequest},
|
||||
shard::TenantShardId,
|
||||
upcall_api::{
|
||||
ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
|
||||
@@ -148,10 +148,10 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
|
||||
.and_then(|jv| jv.as_str().map(|str| str.to_owned()));
|
||||
|
||||
match az_id_from_metadata {
|
||||
Some(az_id) => Some(az_id),
|
||||
Some(az_id) => Some(AvailabilityZone(az_id)),
|
||||
None => {
|
||||
tracing::warn!("metadata.json does not contain an 'availability_zone_id' field");
|
||||
conf.availability_zone.clone()
|
||||
conf.availability_zone.clone().map(AvailabilityZone)
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -589,6 +589,10 @@ async fn timeline_create_handler(
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
HttpErrorBody::from_msg(e.to_string()),
|
||||
),
|
||||
Err(e @ tenant::CreateTimelineError::AncestorArchived) => json_response(
|
||||
StatusCode::NOT_ACCEPTABLE,
|
||||
HttpErrorBody::from_msg(e.to_string()),
|
||||
),
|
||||
Err(tenant::CreateTimelineError::ShuttingDown) => json_response(
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
HttpErrorBody::from_msg("tenant shutting down".to_string()),
|
||||
|
||||
@@ -563,6 +563,8 @@ pub enum CreateTimelineError {
|
||||
AncestorLsn(anyhow::Error),
|
||||
#[error("ancestor timeline is not active")]
|
||||
AncestorNotActive,
|
||||
#[error("ancestor timeline is archived")]
|
||||
AncestorArchived,
|
||||
#[error("tenant shutting down")]
|
||||
ShuttingDown,
|
||||
#[error(transparent)]
|
||||
@@ -1698,6 +1700,11 @@ impl Tenant {
|
||||
return Err(CreateTimelineError::AncestorNotActive);
|
||||
}
|
||||
|
||||
if ancestor_timeline.is_archived() == Some(true) {
|
||||
info!("tried to branch archived timeline");
|
||||
return Err(CreateTimelineError::AncestorArchived);
|
||||
}
|
||||
|
||||
if let Some(lsn) = ancestor_start_lsn.as_mut() {
|
||||
*lsn = lsn.align();
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ use itertools::Itertools;
|
||||
use super::storage_layer::LayerName;
|
||||
|
||||
/// Checks whether a layer map is valid (i.e., is a valid result of the current compaction algorithm if nothing goes wrong).
|
||||
///
|
||||
/// The function checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example,
|
||||
///
|
||||
/// ```plain
|
||||
|
||||
@@ -112,7 +112,7 @@ use pageserver_api::reltag::RelTag;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use postgres_ffi::to_pg_timestamp;
|
||||
use postgres_ffi::{to_pg_timestamp, v14::xlog_utils, WAL_SEGMENT_SIZE};
|
||||
use utils::{
|
||||
completion,
|
||||
generation::Generation,
|
||||
@@ -1337,6 +1337,10 @@ impl Timeline {
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<LsnLease> {
|
||||
let lease = {
|
||||
// Normalize the requested LSN to be aligned, and move to the first record
|
||||
// if it points to the beginning of the page (header).
|
||||
let lsn = xlog_utils::normalize_lsn(lsn, WAL_SEGMENT_SIZE);
|
||||
|
||||
let mut gc_info = self.gc_info.write().unwrap();
|
||||
|
||||
let valid_until = SystemTime::now() + length;
|
||||
@@ -3597,7 +3601,7 @@ impl Timeline {
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| FlushLayerError::from_anyhow(self, e))?;
|
||||
.map_err(|e| FlushLayerError::from_anyhow(self, e.into()))?;
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(FlushLayerError::Cancelled);
|
||||
@@ -3836,16 +3840,20 @@ impl Timeline {
|
||||
partition_size: u64,
|
||||
flags: EnumSet<CompactFlags>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<((KeyPartitioning, SparseKeyPartitioning), Lsn)> {
|
||||
) -> Result<((KeyPartitioning, SparseKeyPartitioning), Lsn), CompactionError> {
|
||||
let Ok(mut partitioning_guard) = self.partitioning.try_lock() else {
|
||||
// NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timeline.
|
||||
// The other is the initdb optimization in flush_frozen_layer, used by `boostrap_timeline`, which runs before `.activate()`
|
||||
// and hence before the compaction task starts.
|
||||
anyhow::bail!("repartition() called concurrently, this should not happen");
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"repartition() called concurrently, this should not happen"
|
||||
)));
|
||||
};
|
||||
let ((dense_partition, sparse_partition), partition_lsn) = &*partitioning_guard;
|
||||
if lsn < *partition_lsn {
|
||||
anyhow::bail!("repartition() called with LSN going backwards, this should not happen");
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"repartition() called with LSN going backwards, this should not happen"
|
||||
)));
|
||||
}
|
||||
|
||||
let distance = lsn.0 - partition_lsn.0;
|
||||
@@ -4447,6 +4455,12 @@ pub(crate) enum CompactionError {
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl CompactionError {
|
||||
pub fn is_cancelled(&self) -> bool {
|
||||
matches!(self, CompactionError::ShuttingDown)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CollectKeySpaceError> for CompactionError {
|
||||
fn from(err: CollectKeySpaceError) -> Self {
|
||||
match err {
|
||||
|
||||
@@ -390,7 +390,7 @@ impl Timeline {
|
||||
// error but continue.
|
||||
//
|
||||
// Suppress error when it's due to cancellation
|
||||
if !self.cancel.is_cancelled() {
|
||||
if !self.cancel.is_cancelled() && !err.is_cancelled() {
|
||||
tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
|
||||
}
|
||||
(1, false)
|
||||
|
||||
@@ -25,7 +25,18 @@ SHLIB_LINK_INTERNAL = $(libpq)
|
||||
SHLIB_LINK = -lcurl
|
||||
|
||||
EXTENSION = neon
|
||||
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql neon--1.3--1.4.sql neon--1.4--1.3.sql neon--1.4--1.5.sql neon--1.5--1.4.sql
|
||||
DATA = \
|
||||
neon--1.0.sql \
|
||||
neon--1.0--1.1.sql \
|
||||
neon--1.1--1.2.sql \
|
||||
neon--1.2--1.3.sql \
|
||||
neon--1.3--1.4.sql \
|
||||
neon--1.4--1.5.sql \
|
||||
neon--1.5--1.4.sql \
|
||||
neon--1.4--1.3.sql \
|
||||
neon--1.3--1.2.sql \
|
||||
neon--1.2--1.1.sql \
|
||||
neon--1.1--1.0.sql
|
||||
PGFILEDESC = "neon - cloud storage for PostgreSQL"
|
||||
|
||||
EXTRA_CLEAN = \
|
||||
|
||||
@@ -29,7 +29,6 @@ dashmap.workspace = true
|
||||
env_logger.workspace = true
|
||||
framed-websockets.workspace = true
|
||||
futures.workspace = true
|
||||
git-version.workspace = true
|
||||
hashbrown.workspace = true
|
||||
hashlink.workspace = true
|
||||
hex.workspace = true
|
||||
|
||||
@@ -21,7 +21,6 @@ chrono.workspace = true
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
crc32c.workspace = true
|
||||
fail.workspace = true
|
||||
git-version.workspace = true
|
||||
hex.workspace = true
|
||||
humantime.workspace = true
|
||||
hyper.workspace = true
|
||||
|
||||
@@ -15,7 +15,6 @@ const_format.workspace = true
|
||||
futures.workspace = true
|
||||
futures-core.workspace = true
|
||||
futures-util.workspace = true
|
||||
git-version.workspace = true
|
||||
humantime.workspace = true
|
||||
hyper = { workspace = true, features = ["full"] }
|
||||
once_cell.workspace = true
|
||||
|
||||
@@ -20,7 +20,6 @@ chrono.workspace = true
|
||||
clap.workspace = true
|
||||
fail.workspace = true
|
||||
futures.workspace = true
|
||||
git-version.workspace = true
|
||||
hex.workspace = true
|
||||
hyper.workspace = true
|
||||
humantime.workspace = true
|
||||
|
||||
@@ -515,7 +515,7 @@ async fn handle_tenant_timeline_passthrough(
|
||||
tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);
|
||||
|
||||
// Find the node that holds shard zero
|
||||
let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id)?;
|
||||
let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?;
|
||||
|
||||
// Callers will always pass an unsharded tenant ID. Before proxying, we must
|
||||
// rewrite this to a shard-aware shard zero ID.
|
||||
@@ -545,10 +545,10 @@ async fn handle_tenant_timeline_passthrough(
|
||||
let _timer = latency.start_timer(labels.clone());
|
||||
|
||||
let client = mgmt_api::Client::new(node.base_url(), service.get_config().jwt_token.as_deref());
|
||||
let resp = client.get_raw(path).await.map_err(|_e|
|
||||
// FIXME: give APiError a proper Unavailable variant. We return 503 here because
|
||||
// if we can't successfully send a request to the pageserver, we aren't available.
|
||||
ApiError::ShuttingDown)?;
|
||||
let resp = client.get_raw(path).await.map_err(|e|
|
||||
// We return 503 here because if we can't successfully send a request to the pageserver,
|
||||
// either we aren't available or the pageserver is unavailable.
|
||||
ApiError::ResourceUnavailable(format!("Error sending pageserver API request to {node}: {e}").into()))?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
let error_counter = &METRICS_REGISTRY
|
||||
@@ -557,6 +557,19 @@ async fn handle_tenant_timeline_passthrough(
|
||||
error_counter.inc(labels);
|
||||
}
|
||||
|
||||
// Transform 404 into 503 if we raced with a migration
|
||||
if resp.status() == reqwest::StatusCode::NOT_FOUND {
|
||||
// Look up node again: if we migrated it will be different
|
||||
let (new_node, _tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?;
|
||||
if new_node.get_id() != node.get_id() {
|
||||
// Rather than retry here, send the client a 503 to prompt a retry: this matches
|
||||
// the pageserver's use of 503, and all clients calling this API should retry on 503.
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
format!("Pageserver {node} returned 404, was migrated to {new_node}").into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// We have a reqest::Response, would like a http::Response
|
||||
let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp.status())?);
|
||||
for (k, v) in resp.headers() {
|
||||
|
||||
@@ -2,8 +2,8 @@ use std::{str::FromStr, time::Duration};
|
||||
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy,
|
||||
TenantLocateResponseShard,
|
||||
AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeRegisterRequest,
|
||||
NodeSchedulingPolicy, TenantLocateResponseShard,
|
||||
},
|
||||
shard::TenantShardId,
|
||||
};
|
||||
@@ -36,7 +36,7 @@ pub(crate) struct Node {
|
||||
listen_pg_addr: String,
|
||||
listen_pg_port: u16,
|
||||
|
||||
availability_zone_id: String,
|
||||
availability_zone_id: AvailabilityZone,
|
||||
|
||||
// This cancellation token means "stop any RPCs in flight to this node, and don't start
|
||||
// any more". It is not related to process shutdown.
|
||||
@@ -64,8 +64,8 @@ impl Node {
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) fn get_availability_zone_id(&self) -> &str {
|
||||
self.availability_zone_id.as_str()
|
||||
pub(crate) fn get_availability_zone_id(&self) -> &AvailabilityZone {
|
||||
&self.availability_zone_id
|
||||
}
|
||||
|
||||
pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy {
|
||||
@@ -181,7 +181,7 @@ impl Node {
|
||||
listen_http_port: u16,
|
||||
listen_pg_addr: String,
|
||||
listen_pg_port: u16,
|
||||
availability_zone_id: String,
|
||||
availability_zone_id: AvailabilityZone,
|
||||
) -> Self {
|
||||
Self {
|
||||
id,
|
||||
@@ -204,7 +204,7 @@ impl Node {
|
||||
listen_http_port: self.listen_http_port as i32,
|
||||
listen_pg_addr: self.listen_pg_addr.clone(),
|
||||
listen_pg_port: self.listen_pg_port as i32,
|
||||
availability_zone_id: self.availability_zone_id.clone(),
|
||||
availability_zone_id: self.availability_zone_id.0.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -219,7 +219,7 @@ impl Node {
|
||||
listen_http_port: np.listen_http_port as u16,
|
||||
listen_pg_addr: np.listen_pg_addr,
|
||||
listen_pg_port: np.listen_pg_port as u16,
|
||||
availability_zone_id: np.availability_zone_id,
|
||||
availability_zone_id: AvailabilityZone(np.availability_zone_id),
|
||||
cancel: CancellationToken::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ use diesel::pg::PgConnection;
|
||||
use diesel::prelude::*;
|
||||
use diesel::Connection;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::controller_api::AvailabilityZone;
|
||||
use pageserver_api::controller_api::MetadataHealthRecord;
|
||||
use pageserver_api::controller_api::ShardSchedulingPolicy;
|
||||
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
|
||||
@@ -667,8 +668,8 @@ impl Persistence {
|
||||
|
||||
pub(crate) async fn set_tenant_shard_preferred_azs(
|
||||
&self,
|
||||
preferred_azs: Vec<(TenantShardId, String)>,
|
||||
) -> DatabaseResult<Vec<(TenantShardId, String)>> {
|
||||
preferred_azs: Vec<(TenantShardId, AvailabilityZone)>,
|
||||
) -> DatabaseResult<Vec<(TenantShardId, AvailabilityZone)>> {
|
||||
use crate::schema::tenant_shards::dsl::*;
|
||||
|
||||
self.with_measured_conn(DatabaseOperation::SetPreferredAzs, move |conn| {
|
||||
@@ -679,7 +680,7 @@ impl Persistence {
|
||||
.filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
|
||||
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
|
||||
.filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
|
||||
.set(preferred_az_id.eq(preferred_az))
|
||||
.set(preferred_az_id.eq(preferred_az.0.clone()))
|
||||
.execute(conn)?;
|
||||
|
||||
if updated == 1 {
|
||||
|
||||
@@ -463,7 +463,7 @@ impl Reconciler {
|
||||
for (timeline_id, baseline_lsn) in &baseline {
|
||||
match latest.get(timeline_id) {
|
||||
Some(latest_lsn) => {
|
||||
tracing::info!("🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}");
|
||||
tracing::info!(timeline_id = %timeline_id, "🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}");
|
||||
if latest_lsn < baseline_lsn {
|
||||
any_behind = true;
|
||||
}
|
||||
@@ -541,6 +541,8 @@ impl Reconciler {
|
||||
}
|
||||
}
|
||||
|
||||
pausable_failpoint!("reconciler-live-migrate-pre-generation-inc");
|
||||
|
||||
// Increment generation before attaching to new pageserver
|
||||
self.generation = Some(
|
||||
self.persistence
|
||||
@@ -617,6 +619,8 @@ impl Reconciler {
|
||||
},
|
||||
);
|
||||
|
||||
pausable_failpoint!("reconciler-live-migrate-post-detach");
|
||||
|
||||
tracing::info!("🔁 Switching to AttachedSingle mode on node {dest_ps}",);
|
||||
let dest_final_conf = build_location_config(
|
||||
&self.shard,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use crate::{node::Node, tenant_shard::TenantShard};
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::models::PageserverUtilization;
|
||||
use pageserver_api::{controller_api::AvailabilityZone, models::PageserverUtilization};
|
||||
use serde::Serialize;
|
||||
use std::{collections::HashMap, fmt::Debug};
|
||||
use utils::{http::error::ApiError, id::NodeId};
|
||||
@@ -32,6 +32,8 @@ pub(crate) struct SchedulerNode {
|
||||
shard_count: usize,
|
||||
/// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`].
|
||||
attached_shard_count: usize,
|
||||
/// Availability zone id in which the node resides
|
||||
az: AvailabilityZone,
|
||||
|
||||
/// Whether this node is currently elegible to have new shards scheduled (this is derived
|
||||
/// from a node's availability state and scheduling policy).
|
||||
@@ -42,6 +44,7 @@ pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized {
|
||||
fn generate(
|
||||
node_id: &NodeId,
|
||||
node: &mut SchedulerNode,
|
||||
preferred_az: &Option<AvailabilityZone>,
|
||||
context: &ScheduleContext,
|
||||
) -> Option<Self>;
|
||||
fn is_overloaded(&self) -> bool;
|
||||
@@ -62,6 +65,72 @@ impl ShardTag for SecondaryShardTag {
|
||||
type Score = NodeSecondarySchedulingScore;
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Debug, Clone, Copy)]
|
||||
enum AzMatch {
|
||||
Yes,
|
||||
No,
|
||||
Unknown,
|
||||
}
|
||||
|
||||
impl AzMatch {
|
||||
fn new(node_az: &AvailabilityZone, shard_preferred_az: Option<&AvailabilityZone>) -> Self {
|
||||
match shard_preferred_az {
|
||||
Some(preferred_az) if preferred_az == node_az => Self::Yes,
|
||||
Some(_preferred_az) => Self::No,
|
||||
None => Self::Unknown,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Debug, Clone, Copy)]
|
||||
struct AttachmentAzMatch(AzMatch);
|
||||
|
||||
impl Ord for AttachmentAzMatch {
|
||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||
// Lower scores indicate a more suitable node.
|
||||
// Note that we prefer a node for which we don't have
|
||||
// info to a node which we are certain doesn't match the
|
||||
// preferred AZ of the shard.
|
||||
let az_match_score = |az_match: &AzMatch| match az_match {
|
||||
AzMatch::Yes => 0,
|
||||
AzMatch::Unknown => 1,
|
||||
AzMatch::No => 2,
|
||||
};
|
||||
|
||||
az_match_score(&self.0).cmp(&az_match_score(&other.0))
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for AttachmentAzMatch {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Debug, Clone, Copy)]
|
||||
struct SecondaryAzMatch(AzMatch);
|
||||
|
||||
impl Ord for SecondaryAzMatch {
|
||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||
// Lower scores indicate a more suitable node.
|
||||
// For secondary locations we wish to avoid the preferred AZ
|
||||
// of the shard.
|
||||
let az_match_score = |az_match: &AzMatch| match az_match {
|
||||
AzMatch::No => 0,
|
||||
AzMatch::Unknown => 1,
|
||||
AzMatch::Yes => 2,
|
||||
};
|
||||
|
||||
az_match_score(&self.0).cmp(&az_match_score(&other.0))
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for SecondaryAzMatch {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
/// Scheduling score of a given node for shard attachments.
|
||||
/// Lower scores indicate more suitable nodes.
|
||||
/// Ordering is given by member declaration order (top to bottom).
|
||||
@@ -70,6 +139,10 @@ pub(crate) struct NodeAttachmentSchedulingScore {
|
||||
/// The number of shards belonging to the tenant currently being
|
||||
/// scheduled that are attached to this node.
|
||||
affinity_score: AffinityScore,
|
||||
/// Flag indicating whether this node matches the preferred AZ
|
||||
/// of the shard. For equal affinity scores, nodes in the matching AZ
|
||||
/// are considered first.
|
||||
az_match: AttachmentAzMatch,
|
||||
/// Size of [`ScheduleContext::attached_nodes`] for the current node.
|
||||
/// This normally tracks the number of attached shards belonging to the
|
||||
/// tenant being scheduled that are already on this node.
|
||||
@@ -87,6 +160,7 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
|
||||
fn generate(
|
||||
node_id: &NodeId,
|
||||
node: &mut SchedulerNode,
|
||||
preferred_az: &Option<AvailabilityZone>,
|
||||
context: &ScheduleContext,
|
||||
) -> Option<Self> {
|
||||
let utilization = match &mut node.may_schedule {
|
||||
@@ -102,6 +176,7 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
|
||||
.get(node_id)
|
||||
.copied()
|
||||
.unwrap_or(AffinityScore::FREE),
|
||||
az_match: AttachmentAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())),
|
||||
attached_shards_in_context: context.attached_nodes.get(node_id).copied().unwrap_or(0),
|
||||
utilization_score: utilization.cached_score(),
|
||||
total_attached_shard_count: node.attached_shard_count,
|
||||
@@ -123,6 +198,11 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
|
||||
/// Ordering is given by member declaration order (top to bottom).
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
|
||||
pub(crate) struct NodeSecondarySchedulingScore {
|
||||
/// Flag indicating whether this node matches the preferred AZ
|
||||
/// of the shard. For secondary locations we wish to avoid nodes in.
|
||||
/// the preferred AZ of the shard, since that's where the attached location
|
||||
/// should be scheduled and having the secondary in the same AZ is bad for HA.
|
||||
az_match: SecondaryAzMatch,
|
||||
/// The number of shards belonging to the tenant currently being
|
||||
/// scheduled that are attached to this node.
|
||||
affinity_score: AffinityScore,
|
||||
@@ -139,6 +219,7 @@ impl NodeSchedulingScore for NodeSecondarySchedulingScore {
|
||||
fn generate(
|
||||
node_id: &NodeId,
|
||||
node: &mut SchedulerNode,
|
||||
preferred_az: &Option<AvailabilityZone>,
|
||||
context: &ScheduleContext,
|
||||
) -> Option<Self> {
|
||||
let utilization = match &mut node.may_schedule {
|
||||
@@ -149,6 +230,7 @@ impl NodeSchedulingScore for NodeSecondarySchedulingScore {
|
||||
};
|
||||
|
||||
Some(Self {
|
||||
az_match: SecondaryAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())),
|
||||
affinity_score: context
|
||||
.nodes
|
||||
.get(node_id)
|
||||
@@ -179,6 +261,7 @@ impl PartialEq for SchedulerNode {
|
||||
may_schedule_matches
|
||||
&& self.shard_count == other.shard_count
|
||||
&& self.attached_shard_count == other.attached_shard_count
|
||||
&& self.az == other.az
|
||||
}
|
||||
}
|
||||
|
||||
@@ -293,6 +376,7 @@ impl Scheduler {
|
||||
shard_count: 0,
|
||||
attached_shard_count: 0,
|
||||
may_schedule: node.may_schedule(),
|
||||
az: node.get_availability_zone_id().clone(),
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -319,6 +403,7 @@ impl Scheduler {
|
||||
shard_count: 0,
|
||||
attached_shard_count: 0,
|
||||
may_schedule: node.may_schedule(),
|
||||
az: node.get_availability_zone_id().clone(),
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -497,6 +582,7 @@ impl Scheduler {
|
||||
shard_count: 0,
|
||||
attached_shard_count: 0,
|
||||
may_schedule: node.may_schedule(),
|
||||
az: node.get_availability_zone_id().clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -542,6 +628,7 @@ impl Scheduler {
|
||||
fn compute_node_scores<Score>(
|
||||
&mut self,
|
||||
hard_exclude: &[NodeId],
|
||||
preferred_az: &Option<AvailabilityZone>,
|
||||
context: &ScheduleContext,
|
||||
) -> Vec<Score>
|
||||
where
|
||||
@@ -553,7 +640,7 @@ impl Scheduler {
|
||||
if hard_exclude.contains(k) {
|
||||
None
|
||||
} else {
|
||||
Score::generate(k, v, context)
|
||||
Score::generate(k, v, preferred_az, context)
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
@@ -571,13 +658,15 @@ impl Scheduler {
|
||||
pub(crate) fn schedule_shard<Tag: ShardTag>(
|
||||
&mut self,
|
||||
hard_exclude: &[NodeId],
|
||||
preferred_az: &Option<AvailabilityZone>,
|
||||
context: &ScheduleContext,
|
||||
) -> Result<NodeId, ScheduleError> {
|
||||
if self.nodes.is_empty() {
|
||||
return Err(ScheduleError::NoPageservers);
|
||||
}
|
||||
|
||||
let mut scores = self.compute_node_scores::<Tag::Score>(hard_exclude, context);
|
||||
let mut scores =
|
||||
self.compute_node_scores::<Tag::Score>(hard_exclude, preferred_az, context);
|
||||
|
||||
// Exclude nodes whose utilization is critically high, if there are alternatives available. This will
|
||||
// cause us to violate affinity rules if it is necessary to avoid critically overloading nodes: for example
|
||||
@@ -634,6 +723,12 @@ impl Scheduler {
|
||||
Ok(node_id)
|
||||
}
|
||||
|
||||
/// Selects any available node. This is suitable for performing background work (e.g. S3
|
||||
/// deletions).
|
||||
pub(crate) fn any_available_node(&mut self) -> Result<NodeId, ScheduleError> {
|
||||
self.schedule_shard::<AttachedShardTag>(&[], &None, &ScheduleContext::default())
|
||||
}
|
||||
|
||||
/// Unit test access to internal state
|
||||
#[cfg(test)]
|
||||
pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
|
||||
@@ -650,13 +745,22 @@ impl Scheduler {
|
||||
pub(crate) mod test_utils {
|
||||
|
||||
use crate::node::Node;
|
||||
use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization};
|
||||
use pageserver_api::{
|
||||
controller_api::{AvailabilityZone, NodeAvailability},
|
||||
models::utilization::test_utilization,
|
||||
};
|
||||
use std::collections::HashMap;
|
||||
use utils::id::NodeId;
|
||||
|
||||
/// Test helper: synthesize the requested number of nodes, all in active state.
|
||||
///
|
||||
/// Node IDs start at one.
|
||||
pub(crate) fn make_test_nodes(n: u64) -> HashMap<NodeId, Node> {
|
||||
///
|
||||
/// The `azs` argument specifies the list of availability zones which will be assigned
|
||||
/// to nodes in round-robin fashion. If empy, a default AZ is assigned.
|
||||
pub(crate) fn make_test_nodes(n: u64, azs: &[AvailabilityZone]) -> HashMap<NodeId, Node> {
|
||||
let mut az_iter = azs.iter().cycle();
|
||||
|
||||
(1..n + 1)
|
||||
.map(|i| {
|
||||
(NodeId(i), {
|
||||
@@ -666,7 +770,10 @@ pub(crate) mod test_utils {
|
||||
80 + i as u16,
|
||||
format!("pghost-{i}"),
|
||||
5432 + i as u16,
|
||||
"test-az".to_string(),
|
||||
az_iter
|
||||
.next()
|
||||
.cloned()
|
||||
.unwrap_or(AvailabilityZone("test-az".to_string())),
|
||||
);
|
||||
node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0)));
|
||||
assert!(node.is_available());
|
||||
@@ -686,7 +793,7 @@ mod tests {
|
||||
use crate::tenant_shard::IntentState;
|
||||
#[test]
|
||||
fn scheduler_basic() -> anyhow::Result<()> {
|
||||
let nodes = test_utils::make_test_nodes(2);
|
||||
let nodes = test_utils::make_test_nodes(2, &[]);
|
||||
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
let mut t1_intent = IntentState::new();
|
||||
@@ -694,9 +801,9 @@ mod tests {
|
||||
|
||||
let context = ScheduleContext::default();
|
||||
|
||||
let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &context)?;
|
||||
let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &None, &context)?;
|
||||
t1_intent.set_attached(&mut scheduler, Some(scheduled));
|
||||
let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &context)?;
|
||||
let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &None, &context)?;
|
||||
t2_intent.set_attached(&mut scheduler, Some(scheduled));
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
|
||||
@@ -705,8 +812,11 @@ mod tests {
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
|
||||
|
||||
let scheduled =
|
||||
scheduler.schedule_shard::<AttachedShardTag>(&t1_intent.all_pageservers(), &context)?;
|
||||
let scheduled = scheduler.schedule_shard::<AttachedShardTag>(
|
||||
&t1_intent.all_pageservers(),
|
||||
&None,
|
||||
&context,
|
||||
)?;
|
||||
t1_intent.push_secondary(&mut scheduler, scheduled);
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
|
||||
@@ -746,7 +856,7 @@ mod tests {
|
||||
#[test]
|
||||
/// Test the PageserverUtilization's contribution to scheduling algorithm
|
||||
fn scheduler_utilization() {
|
||||
let mut nodes = test_utils::make_test_nodes(3);
|
||||
let mut nodes = test_utils::make_test_nodes(3, &[]);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
// Need to keep these alive because they contribute to shard counts via RAII
|
||||
@@ -761,7 +871,7 @@ mod tests {
|
||||
context: &ScheduleContext,
|
||||
) {
|
||||
let scheduled = scheduler
|
||||
.schedule_shard::<AttachedShardTag>(&[], context)
|
||||
.schedule_shard::<AttachedShardTag>(&[], &None, context)
|
||||
.unwrap();
|
||||
let mut intent = IntentState::new();
|
||||
intent.set_attached(scheduler, Some(scheduled));
|
||||
@@ -870,4 +980,98 @@ mod tests {
|
||||
intent.clear(&mut scheduler);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
/// A simple test that showcases AZ-aware scheduling and its interaction with
|
||||
/// affinity scores.
|
||||
fn az_scheduling() {
|
||||
let az_a_tag = AvailabilityZone("az-a".to_string());
|
||||
let az_b_tag = AvailabilityZone("az-b".to_string());
|
||||
|
||||
let nodes = test_utils::make_test_nodes(3, &[az_a_tag.clone(), az_b_tag.clone()]);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
// Need to keep these alive because they contribute to shard counts via RAII
|
||||
let mut scheduled_intents = Vec::new();
|
||||
|
||||
let mut context = ScheduleContext::default();
|
||||
|
||||
fn assert_scheduler_chooses<Tag: ShardTag>(
|
||||
expect_node: NodeId,
|
||||
preferred_az: Option<AvailabilityZone>,
|
||||
scheduled_intents: &mut Vec<IntentState>,
|
||||
scheduler: &mut Scheduler,
|
||||
context: &mut ScheduleContext,
|
||||
) {
|
||||
let scheduled = scheduler
|
||||
.schedule_shard::<Tag>(&[], &preferred_az, context)
|
||||
.unwrap();
|
||||
let mut intent = IntentState::new();
|
||||
intent.set_attached(scheduler, Some(scheduled));
|
||||
scheduled_intents.push(intent);
|
||||
assert_eq!(scheduled, expect_node);
|
||||
|
||||
context.avoid(&[scheduled]);
|
||||
}
|
||||
|
||||
assert_scheduler_chooses::<AttachedShardTag>(
|
||||
NodeId(1),
|
||||
Some(az_a_tag.clone()),
|
||||
&mut scheduled_intents,
|
||||
&mut scheduler,
|
||||
&mut context,
|
||||
);
|
||||
|
||||
// Node 2 and 3 have affinity score equal to 0, but node 3
|
||||
// is in "az-a" so we prefer that.
|
||||
assert_scheduler_chooses::<AttachedShardTag>(
|
||||
NodeId(3),
|
||||
Some(az_a_tag.clone()),
|
||||
&mut scheduled_intents,
|
||||
&mut scheduler,
|
||||
&mut context,
|
||||
);
|
||||
|
||||
// Node 2 is not in "az-a", but it has the lowest affinity so we prefer that.
|
||||
assert_scheduler_chooses::<AttachedShardTag>(
|
||||
NodeId(2),
|
||||
Some(az_a_tag.clone()),
|
||||
&mut scheduled_intents,
|
||||
&mut scheduler,
|
||||
&mut context,
|
||||
);
|
||||
|
||||
// Avoid nodes in "az-a" for the secondary location.
|
||||
assert_scheduler_chooses::<SecondaryShardTag>(
|
||||
NodeId(2),
|
||||
Some(az_a_tag.clone()),
|
||||
&mut scheduled_intents,
|
||||
&mut scheduler,
|
||||
&mut context,
|
||||
);
|
||||
|
||||
// Avoid nodes in "az-b" for the secondary location.
|
||||
// Nodes 1 and 3 are identically loaded, so prefer the lowest node id.
|
||||
assert_scheduler_chooses::<SecondaryShardTag>(
|
||||
NodeId(1),
|
||||
Some(az_b_tag.clone()),
|
||||
&mut scheduled_intents,
|
||||
&mut scheduler,
|
||||
&mut context,
|
||||
);
|
||||
|
||||
// Avoid nodes in "az-b" for the secondary location.
|
||||
// Node 3 has lower affinity score than 1, so prefer that.
|
||||
assert_scheduler_chooses::<SecondaryShardTag>(
|
||||
NodeId(3),
|
||||
Some(az_b_tag.clone()),
|
||||
&mut scheduled_intents,
|
||||
&mut scheduler,
|
||||
&mut context,
|
||||
);
|
||||
|
||||
for mut intent in scheduled_intents {
|
||||
intent.clear(&mut scheduler);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ use crate::{
|
||||
ShardGenerationState, TenantFilter,
|
||||
},
|
||||
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
|
||||
scheduler::{AttachedShardTag, MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
|
||||
tenant_shard::{
|
||||
MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization,
|
||||
ScheduleOptimizationAction,
|
||||
@@ -1265,6 +1265,8 @@ impl Service {
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
use pageserver_api::controller_api::AvailabilityZone;
|
||||
|
||||
// Hack: insert scheduler state for all nodes referenced by shards, as compatibility
|
||||
// tests only store the shards, not the nodes. The nodes will be loaded shortly
|
||||
// after when pageservers start up and register.
|
||||
@@ -1282,7 +1284,7 @@ impl Service {
|
||||
123,
|
||||
"".to_string(),
|
||||
123,
|
||||
"test_az".to_string(),
|
||||
AvailabilityZone("test_az".to_string()),
|
||||
);
|
||||
|
||||
scheduler.node_upsert(&node);
|
||||
@@ -2099,7 +2101,7 @@ impl Service {
|
||||
let az_id = locked
|
||||
.nodes
|
||||
.get(&resp.node_id)
|
||||
.map(|n| n.get_availability_zone_id().to_string())?;
|
||||
.map(|n| n.get_availability_zone_id().clone())?;
|
||||
|
||||
Some((resp.shard_id, az_id))
|
||||
})
|
||||
@@ -2629,8 +2631,7 @@ impl Service {
|
||||
let scheduler = &mut locked.scheduler;
|
||||
// Right now we only perform the operation on a single node without parallelization
|
||||
// TODO fan out the operation to multiple nodes for better performance
|
||||
let node_id =
|
||||
scheduler.schedule_shard::<AttachedShardTag>(&[], &ScheduleContext::default())?;
|
||||
let node_id = scheduler.any_available_node()?;
|
||||
let node = locked
|
||||
.nodes
|
||||
.get(&node_id)
|
||||
@@ -2816,8 +2817,7 @@ impl Service {
|
||||
|
||||
// Pick an arbitrary node to use for remote deletions (does not have to be where the tenant
|
||||
// was attached, just has to be able to see the S3 content)
|
||||
let node_id =
|
||||
scheduler.schedule_shard::<AttachedShardTag>(&[], &ScheduleContext::default())?;
|
||||
let node_id = scheduler.any_available_node()?;
|
||||
let node = nodes
|
||||
.get(&node_id)
|
||||
.expect("Pageservers may not be deleted while lock is active");
|
||||
@@ -3508,34 +3508,66 @@ impl Service {
|
||||
|
||||
/// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this
|
||||
/// function looks up and returns node. If the tenant isn't found, returns Err(ApiError::NotFound)
|
||||
pub(crate) fn tenant_shard0_node(
|
||||
pub(crate) async fn tenant_shard0_node(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<(Node, TenantShardId), ApiError> {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let Some((tenant_shard_id, shard)) = locked
|
||||
.tenants
|
||||
.range(TenantShardId::tenant_range(tenant_id))
|
||||
.next()
|
||||
// Look up in-memory state and maybe use the node from there.
|
||||
{
|
||||
let locked = self.inner.read().unwrap();
|
||||
let Some((tenant_shard_id, shard)) = locked
|
||||
.tenants
|
||||
.range(TenantShardId::tenant_range(tenant_id))
|
||||
.next()
|
||||
else {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant {tenant_id} not found").into(),
|
||||
));
|
||||
};
|
||||
|
||||
let Some(intent_node_id) = shard.intent.get_attached() else {
|
||||
tracing::warn!(
|
||||
tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
|
||||
"Shard not scheduled (policy {:?}), cannot generate pass-through URL",
|
||||
shard.policy
|
||||
);
|
||||
return Err(ApiError::Conflict(
|
||||
"Cannot call timeline API on non-attached tenant".to_string(),
|
||||
));
|
||||
};
|
||||
|
||||
if shard.reconciler.is_none() {
|
||||
// Optimization: while no reconcile is in flight, we may trust our in-memory state
|
||||
// to tell us which pageserver to use. Otherwise we will fall through and hit the database
|
||||
let Some(node) = locked.nodes.get(intent_node_id) else {
|
||||
// This should never happen
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Shard refers to nonexistent node"
|
||||
)));
|
||||
};
|
||||
return Ok((node.clone(), *tenant_shard_id));
|
||||
}
|
||||
};
|
||||
|
||||
// Look up the latest attached pageserver location from the database
|
||||
// generation state: this will reflect the progress of any ongoing migration.
|
||||
// Note that it is not guaranteed to _stay_ here, our caller must still handle
|
||||
// the case where they call through to the pageserver and get a 404.
|
||||
let db_result = self.persistence.tenant_generations(tenant_id).await?;
|
||||
let Some(ShardGenerationState {
|
||||
tenant_shard_id,
|
||||
generation: _,
|
||||
generation_pageserver: Some(node_id),
|
||||
}) = db_result.first()
|
||||
else {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant {tenant_id} not found").into(),
|
||||
// This can happen if we raced with a tenant deletion or a shard split. On a retry
|
||||
// the caller will either succeed (shard split case), get a proper 404 (deletion case),
|
||||
// or a conflict response (case where tenant was detached in background)
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
"Shard {} not found in database, or is not attached".into(),
|
||||
));
|
||||
};
|
||||
|
||||
// TODO: should use the ID last published to compute_hook, rather than the intent: the intent might
|
||||
// point to somewhere we haven't attached yet.
|
||||
let Some(node_id) = shard.intent.get_attached() else {
|
||||
tracing::warn!(
|
||||
tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
|
||||
"Shard not scheduled (policy {:?}), cannot generate pass-through URL",
|
||||
shard.policy
|
||||
);
|
||||
return Err(ApiError::Conflict(
|
||||
"Cannot call timeline API on non-attached tenant".to_string(),
|
||||
));
|
||||
};
|
||||
|
||||
let locked = self.inner.read().unwrap();
|
||||
let Some(node) = locked.nodes.get(node_id) else {
|
||||
// This should never happen
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
@@ -4481,7 +4513,7 @@ impl Service {
|
||||
let az_id = locked
|
||||
.nodes
|
||||
.get(node_id)
|
||||
.map(|n| n.get_availability_zone_id().to_string())?;
|
||||
.map(|n| n.get_availability_zone_id().clone())?;
|
||||
|
||||
Some((*tid, az_id))
|
||||
})
|
||||
|
||||
@@ -15,7 +15,7 @@ use crate::{
|
||||
service::ReconcileResultRequest,
|
||||
};
|
||||
use pageserver_api::controller_api::{
|
||||
NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
|
||||
AvailabilityZone, NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
|
||||
};
|
||||
use pageserver_api::{
|
||||
models::{LocationConfig, LocationConfigMode, TenantConfig},
|
||||
@@ -146,7 +146,7 @@ pub(crate) struct TenantShard {
|
||||
|
||||
// We should attempt to schedule this shard in the provided AZ to
|
||||
// decrease chances of cross-AZ compute.
|
||||
preferred_az_id: Option<String>,
|
||||
preferred_az_id: Option<AvailabilityZone>,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone, Debug, Serialize)]
|
||||
@@ -540,8 +540,11 @@ impl TenantShard {
|
||||
Ok((true, promote_secondary))
|
||||
} else {
|
||||
// Pick a fresh node: either we had no secondaries or none were schedulable
|
||||
let node_id =
|
||||
scheduler.schedule_shard::<AttachedShardTag>(&self.intent.secondary, context)?;
|
||||
let node_id = scheduler.schedule_shard::<AttachedShardTag>(
|
||||
&self.intent.secondary,
|
||||
&self.preferred_az_id,
|
||||
context,
|
||||
)?;
|
||||
tracing::debug!("Selected {} as attached", node_id);
|
||||
self.intent.set_attached(scheduler, Some(node_id));
|
||||
Ok((true, node_id))
|
||||
@@ -622,8 +625,11 @@ impl TenantShard {
|
||||
|
||||
let mut used_pageservers = vec![attached_node_id];
|
||||
while self.intent.secondary.len() < secondary_count {
|
||||
let node_id = scheduler
|
||||
.schedule_shard::<SecondaryShardTag>(&used_pageservers, context)?;
|
||||
let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
|
||||
&used_pageservers,
|
||||
&self.preferred_az_id,
|
||||
context,
|
||||
)?;
|
||||
self.intent.push_secondary(scheduler, node_id);
|
||||
used_pageservers.push(node_id);
|
||||
modified = true;
|
||||
@@ -636,7 +642,11 @@ impl TenantShard {
|
||||
modified = true;
|
||||
} else if self.intent.secondary.is_empty() {
|
||||
// Populate secondary by scheduling a fresh node
|
||||
let node_id = scheduler.schedule_shard::<SecondaryShardTag>(&[], context)?;
|
||||
let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
|
||||
&[],
|
||||
&self.preferred_az_id,
|
||||
context,
|
||||
)?;
|
||||
self.intent.push_secondary(scheduler, node_id);
|
||||
modified = true;
|
||||
}
|
||||
@@ -815,6 +825,7 @@ impl TenantShard {
|
||||
// with lower utilization.
|
||||
let Ok(candidate_node) = scheduler.schedule_shard::<SecondaryShardTag>(
|
||||
&self.intent.all_pageservers(),
|
||||
&self.preferred_az_id,
|
||||
schedule_context,
|
||||
) else {
|
||||
// A scheduling error means we have no possible candidate replacements
|
||||
@@ -1313,7 +1324,7 @@ impl TenantShard {
|
||||
pending_compute_notification: false,
|
||||
delayed_reconcile: false,
|
||||
scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
|
||||
preferred_az_id: tsp.preferred_az_id,
|
||||
preferred_az_id: tsp.preferred_az_id.map(AvailabilityZone),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1329,15 +1340,15 @@ impl TenantShard {
|
||||
config: serde_json::to_string(&self.config).unwrap(),
|
||||
splitting: SplitState::default(),
|
||||
scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
|
||||
preferred_az_id: self.preferred_az_id.clone(),
|
||||
preferred_az_id: self.preferred_az_id.as_ref().map(|az| az.0.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn preferred_az(&self) -> Option<&str> {
|
||||
self.preferred_az_id.as_deref()
|
||||
pub(crate) fn preferred_az(&self) -> Option<&AvailabilityZone> {
|
||||
self.preferred_az_id.as_ref()
|
||||
}
|
||||
|
||||
pub(crate) fn set_preferred_az(&mut self, preferred_az_id: String) {
|
||||
pub(crate) fn set_preferred_az(&mut self, preferred_az_id: AvailabilityZone) {
|
||||
self.preferred_az_id = Some(preferred_az_id);
|
||||
}
|
||||
}
|
||||
@@ -1350,6 +1361,7 @@ pub(crate) mod tests {
|
||||
controller_api::NodeAvailability,
|
||||
shard::{ShardCount, ShardNumber},
|
||||
};
|
||||
use rand::{rngs::StdRng, SeedableRng};
|
||||
use utils::id::TenantId;
|
||||
|
||||
use crate::scheduler::test_utils::make_test_nodes;
|
||||
@@ -1378,7 +1390,11 @@ pub(crate) mod tests {
|
||||
)
|
||||
}
|
||||
|
||||
fn make_test_tenant(policy: PlacementPolicy, shard_count: ShardCount) -> Vec<TenantShard> {
|
||||
fn make_test_tenant(
|
||||
policy: PlacementPolicy,
|
||||
shard_count: ShardCount,
|
||||
preferred_az: Option<AvailabilityZone>,
|
||||
) -> Vec<TenantShard> {
|
||||
let tenant_id = TenantId::generate();
|
||||
|
||||
(0..shard_count.count())
|
||||
@@ -1390,7 +1406,7 @@ pub(crate) mod tests {
|
||||
shard_number,
|
||||
shard_count,
|
||||
};
|
||||
TenantShard::new(
|
||||
let mut ts = TenantShard::new(
|
||||
tenant_shard_id,
|
||||
ShardIdentity::new(
|
||||
shard_number,
|
||||
@@ -1399,7 +1415,13 @@ pub(crate) mod tests {
|
||||
)
|
||||
.unwrap(),
|
||||
policy.clone(),
|
||||
)
|
||||
);
|
||||
|
||||
if let Some(az) = &preferred_az {
|
||||
ts.set_preferred_az(az.clone());
|
||||
}
|
||||
|
||||
ts
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
@@ -1410,7 +1432,7 @@ pub(crate) mod tests {
|
||||
fn tenant_ha_scheduling() -> anyhow::Result<()> {
|
||||
// Start with three nodes. Our tenant will only use two. The third one is
|
||||
// expected to remain unused.
|
||||
let mut nodes = make_test_nodes(3);
|
||||
let mut nodes = make_test_nodes(3, &[]);
|
||||
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
let mut context = ScheduleContext::default();
|
||||
@@ -1462,7 +1484,7 @@ pub(crate) mod tests {
|
||||
|
||||
#[test]
|
||||
fn intent_from_observed() -> anyhow::Result<()> {
|
||||
let nodes = make_test_nodes(3);
|
||||
let nodes = make_test_nodes(3, &[]);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
@@ -1512,7 +1534,7 @@ pub(crate) mod tests {
|
||||
|
||||
#[test]
|
||||
fn scheduling_mode() -> anyhow::Result<()> {
|
||||
let nodes = make_test_nodes(3);
|
||||
let nodes = make_test_nodes(3, &[]);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
@@ -1537,7 +1559,7 @@ pub(crate) mod tests {
|
||||
|
||||
#[test]
|
||||
fn optimize_attachment() -> anyhow::Result<()> {
|
||||
let nodes = make_test_nodes(3);
|
||||
let nodes = make_test_nodes(3, &[]);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
@@ -1604,7 +1626,7 @@ pub(crate) mod tests {
|
||||
|
||||
#[test]
|
||||
fn optimize_secondary() -> anyhow::Result<()> {
|
||||
let nodes = make_test_nodes(4);
|
||||
let nodes = make_test_nodes(4, &[]);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
@@ -1703,14 +1725,14 @@ pub(crate) mod tests {
|
||||
/// that it converges.
|
||||
#[test]
|
||||
fn optimize_add_nodes() -> anyhow::Result<()> {
|
||||
let nodes = make_test_nodes(4);
|
||||
let nodes = make_test_nodes(4, &[]);
|
||||
|
||||
// Only show the scheduler a couple of nodes
|
||||
let mut scheduler = Scheduler::new([].iter());
|
||||
scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
|
||||
scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
|
||||
|
||||
let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
|
||||
let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
for shard in &mut shards {
|
||||
assert!(shard
|
||||
@@ -1759,16 +1781,16 @@ pub(crate) mod tests {
|
||||
fn initial_scheduling_is_optimal() -> anyhow::Result<()> {
|
||||
use itertools::Itertools;
|
||||
|
||||
let nodes = make_test_nodes(2);
|
||||
let nodes = make_test_nodes(2, &[]);
|
||||
|
||||
let mut scheduler = Scheduler::new([].iter());
|
||||
scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
|
||||
scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
|
||||
|
||||
let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
|
||||
let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
|
||||
let a_context = Rc::new(RefCell::new(ScheduleContext::default()));
|
||||
|
||||
let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
|
||||
let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
|
||||
let b_context = Rc::new(RefCell::new(ScheduleContext::default()));
|
||||
|
||||
let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone()));
|
||||
@@ -1793,4 +1815,147 @@ pub(crate) mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn random_az_shard_scheduling() -> anyhow::Result<()> {
|
||||
use rand::seq::SliceRandom;
|
||||
|
||||
for seed in 0..50 {
|
||||
eprintln!("Running test with seed {seed}");
|
||||
let mut rng = StdRng::seed_from_u64(seed);
|
||||
|
||||
let az_a_tag = AvailabilityZone("az-a".to_string());
|
||||
let az_b_tag = AvailabilityZone("az-b".to_string());
|
||||
let azs = [az_a_tag, az_b_tag];
|
||||
let nodes = make_test_nodes(4, &azs);
|
||||
let mut shards_per_az: HashMap<AvailabilityZone, u32> = HashMap::new();
|
||||
|
||||
let mut scheduler = Scheduler::new([].iter());
|
||||
for node in nodes.values() {
|
||||
scheduler.node_upsert(node);
|
||||
}
|
||||
|
||||
let mut shards = Vec::default();
|
||||
let mut contexts = Vec::default();
|
||||
let mut az_picker = azs.iter().cycle().cloned();
|
||||
for i in 0..100 {
|
||||
let az = az_picker.next().unwrap();
|
||||
let shard_count = i % 4 + 1;
|
||||
*shards_per_az.entry(az.clone()).or_default() += shard_count;
|
||||
|
||||
let tenant_shards = make_test_tenant(
|
||||
PlacementPolicy::Attached(1),
|
||||
ShardCount::new(shard_count.try_into().unwrap()),
|
||||
Some(az),
|
||||
);
|
||||
let context = Rc::new(RefCell::new(ScheduleContext::default()));
|
||||
|
||||
contexts.push(context.clone());
|
||||
let with_ctx = tenant_shards
|
||||
.into_iter()
|
||||
.map(|shard| (shard, context.clone()));
|
||||
for shard_with_ctx in with_ctx {
|
||||
shards.push(shard_with_ctx);
|
||||
}
|
||||
}
|
||||
|
||||
shards.shuffle(&mut rng);
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
struct NodeStats {
|
||||
attachments: u32,
|
||||
secondaries: u32,
|
||||
}
|
||||
|
||||
let mut node_stats: HashMap<NodeId, NodeStats> = HashMap::default();
|
||||
let mut attachments_in_wrong_az = 0;
|
||||
let mut secondaries_in_wrong_az = 0;
|
||||
|
||||
for (shard, context) in &mut shards {
|
||||
let context = &mut *context.borrow_mut();
|
||||
shard.schedule(&mut scheduler, context).unwrap();
|
||||
|
||||
let attached_node = shard.intent.get_attached().unwrap();
|
||||
let stats = node_stats.entry(attached_node).or_default();
|
||||
stats.attachments += 1;
|
||||
|
||||
let secondary_node = *shard.intent.get_secondary().first().unwrap();
|
||||
let stats = node_stats.entry(secondary_node).or_default();
|
||||
stats.secondaries += 1;
|
||||
|
||||
let attached_node_az = nodes
|
||||
.get(&attached_node)
|
||||
.unwrap()
|
||||
.get_availability_zone_id();
|
||||
let secondary_node_az = nodes
|
||||
.get(&secondary_node)
|
||||
.unwrap()
|
||||
.get_availability_zone_id();
|
||||
let preferred_az = shard.preferred_az().unwrap();
|
||||
|
||||
if attached_node_az != preferred_az {
|
||||
eprintln!(
|
||||
"{} attachment was scheduled in AZ {} but preferred AZ {}",
|
||||
shard.tenant_shard_id, attached_node_az, preferred_az
|
||||
);
|
||||
attachments_in_wrong_az += 1;
|
||||
}
|
||||
|
||||
if secondary_node_az == preferred_az {
|
||||
eprintln!(
|
||||
"{} secondary was scheduled in AZ {} which matches preference",
|
||||
shard.tenant_shard_id, attached_node_az
|
||||
);
|
||||
secondaries_in_wrong_az += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let mut violations = Vec::default();
|
||||
|
||||
if attachments_in_wrong_az > 0 {
|
||||
violations.push(format!(
|
||||
"{} attachments scheduled to the incorrect AZ",
|
||||
attachments_in_wrong_az
|
||||
));
|
||||
}
|
||||
|
||||
if secondaries_in_wrong_az > 0 {
|
||||
violations.push(format!(
|
||||
"{} secondaries scheduled to the incorrect AZ",
|
||||
secondaries_in_wrong_az
|
||||
));
|
||||
}
|
||||
|
||||
eprintln!(
|
||||
"attachments_in_wrong_az={} secondaries_in_wrong_az={}",
|
||||
attachments_in_wrong_az, secondaries_in_wrong_az
|
||||
);
|
||||
|
||||
for (node_id, stats) in &node_stats {
|
||||
let node_az = nodes.get(node_id).unwrap().get_availability_zone_id();
|
||||
let ideal_attachment_load = shards_per_az.get(node_az).unwrap() / 2;
|
||||
let allowed_attachment_load =
|
||||
(ideal_attachment_load - 1)..(ideal_attachment_load + 2);
|
||||
|
||||
if !allowed_attachment_load.contains(&stats.attachments) {
|
||||
violations.push(format!(
|
||||
"Found {} attachments on node {}, but expected {}",
|
||||
stats.attachments, node_id, ideal_attachment_load
|
||||
));
|
||||
}
|
||||
|
||||
eprintln!(
|
||||
"{}: attachments={} secondaries={} ideal_attachment_load={}",
|
||||
node_id, stats.attachments, stats.secondaries, ideal_attachment_load
|
||||
);
|
||||
}
|
||||
|
||||
assert!(violations.is_empty(), "{violations:?}");
|
||||
|
||||
for (mut shard, _ctx) in shards {
|
||||
shard.intent.clear(&mut scheduler);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ license.workspace = true
|
||||
aws-sdk-s3.workspace = true
|
||||
either.workspace = true
|
||||
anyhow.workspace = true
|
||||
git-version.workspace = true
|
||||
hex.workspace = true
|
||||
humantime.workspace = true
|
||||
serde.workspace = true
|
||||
|
||||
@@ -642,9 +642,6 @@ class NeonEnvBuilder:
|
||||
patch_script = ""
|
||||
for ps in self.env.pageservers:
|
||||
patch_script += f"UPDATE nodes SET listen_http_port={ps.service_port.http}, listen_pg_port={ps.service_port.pg} WHERE node_id = '{ps.id}';"
|
||||
# This is a temporary to get the backward compat test happy
|
||||
# since the compat snapshot was generated with an older version of neon local
|
||||
patch_script += f"UPDATE nodes SET availability_zone_id='{ps.az_id}' WHERE node_id = '{ps.id}' AND availability_zone_id IS NULL;"
|
||||
patch_script_path.write_text(patch_script)
|
||||
|
||||
# Update the config with info about tenants and timelines
|
||||
|
||||
@@ -122,6 +122,7 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
|
||||
Test static endpoint is protected from GC by acquiring and renewing lsn leases.
|
||||
"""
|
||||
|
||||
LSN_LEASE_LENGTH = 8
|
||||
neon_env_builder.num_pageservers = 2
|
||||
# GC is manual triggered.
|
||||
env = neon_env_builder.init_start(
|
||||
@@ -139,7 +140,7 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
|
||||
"image_creation_threshold": "1",
|
||||
"image_layer_creation_check_threshold": "0",
|
||||
# Short lease length to fit test.
|
||||
"lsn_lease_length": "3s",
|
||||
"lsn_lease_length": f"{LSN_LEASE_LENGTH}s",
|
||||
},
|
||||
initial_tenant_shard_count=2,
|
||||
)
|
||||
@@ -170,10 +171,14 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
|
||||
with env.endpoints.create_start("main") as ep_main:
|
||||
with ep_main.cursor() as cur:
|
||||
cur.execute("CREATE TABLE t0(v0 int primary key, v1 text)")
|
||||
lsn = None
|
||||
lsn = Lsn(0)
|
||||
for i in range(2):
|
||||
lsn = generate_updates_on_main(env, ep_main, i)
|
||||
|
||||
# Round down to the closest LSN on page boundary (unnormalized).
|
||||
XLOG_BLCKSZ = 8192
|
||||
lsn = Lsn((int(lsn) // XLOG_BLCKSZ) * XLOG_BLCKSZ)
|
||||
|
||||
with env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
endpoint_id="static",
|
||||
@@ -183,7 +188,8 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
|
||||
cur.execute("SELECT count(*) FROM t0")
|
||||
assert cur.fetchone() == (ROW_COUNT,)
|
||||
|
||||
time.sleep(3)
|
||||
# Wait for static compute to renew lease at least once.
|
||||
time.sleep(LSN_LEASE_LENGTH / 2)
|
||||
|
||||
generate_updates_on_main(env, ep_main, i, end=100)
|
||||
|
||||
@@ -204,8 +210,9 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
|
||||
# Do some update so we can increment latest_gc_cutoff
|
||||
generate_updates_on_main(env, ep_main, i, end=100)
|
||||
|
||||
# Wait for the existing lease to expire.
|
||||
time.sleep(LSN_LEASE_LENGTH)
|
||||
# Now trigger GC again, layers should be removed.
|
||||
time.sleep(4)
|
||||
for shard, ps in tenant_get_shards(env, env.initial_tenant):
|
||||
client = ps.http_client()
|
||||
gc_result = client.timeline_gc(shard, env.initial_timeline, 0)
|
||||
|
||||
@@ -4,6 +4,7 @@ import threading
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List, Optional, Set, Tuple, Union
|
||||
|
||||
import pytest
|
||||
@@ -2466,6 +2467,87 @@ def test_storage_controller_validate_during_migration(neon_env_builder: NeonEnvB
|
||||
raise
|
||||
|
||||
|
||||
class MigrationFailpoints(Enum):
|
||||
# While only the origin is attached
|
||||
PRE_GENERATION_INC = "reconciler-live-migrate-pre-generation-inc"
|
||||
# While both locations are attached
|
||||
POST_NOTIFY = "reconciler-live-migrate-post-notify"
|
||||
# While only the destination is attached
|
||||
POST_DETACH = "reconciler-live-migrate-post-detach"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"migration_failpoint",
|
||||
[
|
||||
MigrationFailpoints.PRE_GENERATION_INC,
|
||||
MigrationFailpoints.POST_NOTIFY,
|
||||
MigrationFailpoints.POST_DETACH,
|
||||
],
|
||||
)
|
||||
def test_storage_controller_proxy_during_migration(
|
||||
neon_env_builder: NeonEnvBuilder, migration_failpoint: MigrationFailpoints
|
||||
):
|
||||
"""
|
||||
If we send a proxied GET request to the controller during a migration, it should route
|
||||
the request to whichever pageserver was most recently issued a generation.
|
||||
|
||||
Reproducer for https://github.com/neondatabase/neon/issues/9062
|
||||
"""
|
||||
neon_env_builder.num_pageservers = 2
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
env.neon_cli.create_tenant(tenant_id, timeline_id)
|
||||
|
||||
# Activate a failpoint that will cause live migration to get stuck _after_ the generation has been issued
|
||||
# to the new pageserver: this should result in requests routed to the new pageserver.
|
||||
env.storage_controller.configure_failpoints((migration_failpoint.value, "pause"))
|
||||
|
||||
origin_pageserver = env.get_tenant_pageserver(tenant_id)
|
||||
dest_ps_id = [p.id for p in env.pageservers if p.id != origin_pageserver.id][0]
|
||||
|
||||
try:
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
|
||||
migrate_fut = executor.submit(
|
||||
env.storage_controller.tenant_shard_migrate,
|
||||
TenantShardId(tenant_id, 0, 0),
|
||||
dest_ps_id,
|
||||
)
|
||||
|
||||
def has_hit_migration_failpoint():
|
||||
expr = f"at failpoint {str(migration_failpoint.value)}"
|
||||
log.info(expr)
|
||||
assert env.storage_controller.log_contains(expr)
|
||||
|
||||
wait_until(10, 1, has_hit_migration_failpoint)
|
||||
|
||||
# This request should be routed to whichever pageserver holds the highest generation
|
||||
tenant_info = env.storage_controller.pageserver_api().tenant_status(
|
||||
tenant_id,
|
||||
)
|
||||
|
||||
if migration_failpoint in (
|
||||
MigrationFailpoints.POST_NOTIFY,
|
||||
MigrationFailpoints.POST_DETACH,
|
||||
):
|
||||
# We expect request to land on the destination
|
||||
assert tenant_info["generation"] == 2
|
||||
elif migration_failpoint == MigrationFailpoints.PRE_GENERATION_INC:
|
||||
# We expect request to land on the origin
|
||||
assert tenant_info["generation"] == 1
|
||||
|
||||
# Eventually migration completes
|
||||
env.storage_controller.configure_failpoints((migration_failpoint.value, "off"))
|
||||
migrate_fut.result()
|
||||
except:
|
||||
# Always disable 'pause' failpoints, even on failure, to avoid hanging in shutdown
|
||||
env.storage_controller.configure_failpoints((migration_failpoint.value, "off"))
|
||||
raise
|
||||
|
||||
|
||||
@run_only_on_default_postgres("this is like a 'unit test' against storcon db")
|
||||
def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_configs()
|
||||
|
||||
Reference in New Issue
Block a user