Compare commits

..

79 Commits

Author SHA1 Message Date
a-masterov
f26987deef Merge branch 'main' into amasterov/random-ops-add-snapshots 2025-07-31 12:10:43 +02:00
Krzysztof Szafrański
f3ee6e818d [proxy] Correctly classify ConnectErrors (#12793)
As is, e.g. quota errors on wake compute are logged as "compute" errors.
2025-07-31 09:53:48 +00:00
Dmitrii Kovalkov
edd60730c8 safekeeper: use last_log_term in mconf switch + choose most advanced sk in pull timeline (#12778)
## Problem
I discovered two bugs corresponding to safekeeper migration, which
together might lead to a data loss during the migration. The second bug
is from a hadron patch and might lead to a data loss during the
safekeeper restore in hadron as well.

1. `switch_membership` returns the current `term` instead of
`last_log_term`. It is used to choose the `sync_position` in the
algorithm, so we might choose the wrong one and break the correctness
guarantees.
2. The current `term` is used to choose the most advanced SK in
`pull_timeline` with higher priority than `flush_lsn`. It is incorrect
because the most advanced safekeeper is the one with the highest
`(last_log_term, flush_lsn)` pair. The compute might bump term on the
least advanced sk, making it the best choice to pull from, and thus
making committed log entries "uncommitted" after `pull_timeline`

Part of https://databricks.atlassian.net/browse/LKB-1017

## Summary of changes
- Return `last_log_term` in `switch_membership`
- Use `(last_log_term, flush_lsn)` as a primary key for choosing the
most advanced sk in `pull_timeline` and deny pulling if the `max_term`
is higher than on the most advanced sk (hadron only)
- Write tests for both cases
- Retry `sync_safekeepers` in `compute_ctl`
- Take into the account the quorum size when calculating `sync_position`
2025-07-31 09:29:25 +00:00
Aleksandr Sarantsev
975b95f4cd Introduce deletion API improvement RFC (#12484)
## Problem

The deletion logic had become difficult to understand and maintain.

## Summary of changes

- Added an RFC detailing proposed improvements to all deletion-related
APIs.

---------

Co-authored-by: Aleksandr Sarantsev <aleksandr.sarantsev@databricks.com>
2025-07-31 08:34:47 +00:00
Mikhail
01c39f378e prewarm cancellation (#12785)
Add DELETE /lfc/prewarm route which handles ongoing prewarm
cancellation, update API spec, add prewarm Cancelled state
Add offload Cancelled state when LFC is not initialized
2025-07-30 22:05:51 +00:00
Dimitri Fontaine
4d3b28bd2e [Hadron] Always run databricks auth hook. (#12683) 2025-07-30 21:34:30 +00:00
Heikki Linnakangas
81ddd10be6 tests: Don't print Hostname on every test connection (#12782)
These lines are a significant fraction of the total log size of the
regression tests. And it seems very uninteresting, it's always
'localhost' in local tests.
2025-07-30 19:56:22 +00:00
Suhas Thalanki
e470997627 enable tests introduced in hadron commits (#12790)
Enables skipped tests introduced in hadron integration commits
2025-07-30 19:10:33 +00:00
Erik Grinaker
eb2741758b storcon: actually update gRPC address on reattach (#12784)
## Problem

In #12268, we added Pageserver gRPC addresses to the storage controller.
However, we didn't actually persist these in the database.

## Summary of changes

Update the database with the new gRPC address on reattach.
2025-07-30 16:18:35 +00:00
Matthias van de Meent
f3a0e4f255 Improve specificity with which we apply compute specs (#12773)
This makes sure we don't confuse user-controlled functions with PG's
builtin functions.

## Problem

See https://github.com/neondatabase/cloud/issues/31628
2025-07-30 15:29:16 +00:00
Suhas Thalanki
842a5091d5 [BRC-3051] Walproposer: Safekeeper quorum health metrics (#930) (#12750)
Today we don't have any indications (other than spammy logs in PG that
nobody monitors) if the Walproposer in PG cannot connect to/get votes
from all Safekeepers. This means we don't have signals indicating that
the Safekeepers are operating at degraded redundancy. We need these
signals.

Added plumbing in PG extension so that the `neon_perf_counters` view
exports the following gauge metrics on safekeeper health:
- `num_configured_safekeepers`: The total number of safekeepers
configured in PG.
- `num_active_safekeepers`: The number of safekeepers that PG is
actively streaming WAL to.

An alert should be raised whenever `num_active_safekeepers` <
`num_configured_safekeepers`.

The metrics are implemented by adding additional state to the
Walproposer shared memory keeping track of the active statuses of
safekeepers using a simple array. The status of the safekeeper is set to
active (1) after the Walproposer acquires a quorum and starts streaming
data to the safekeeper, and is set to inactive (0) when the connection
with a safekeeper is shut down. We scan the safekeeper status array in
Walproposer shared memory when collecting the metrics to produce results
for the gauges.

Added coverage for the metrics to integration test
`test_wal_acceptor.py::test_timeline_disk_usage_limit`.

## Problem

## Summary of changes

---------

Co-authored-by: William Huang <william.huang@databricks.com>
2025-07-30 15:14:59 +00:00
Alexey Masterov
7c2022c1b5 Modify weights 2025-07-23 15:54:39 +02:00
Alexey Masterov
c233deb1c2 Rename a method for consistency 2025-07-23 15:38:59 +02:00
Alexey Masterov
d550b67c5f Reword the comment 2025-07-23 15:27:36 +02:00
Alexey Masterov
2ca2b05ab5 Remove the redundant condition 2025-07-23 15:21:21 +02:00
Alexey Masterov
5e1057b860 Return sleep before retry 2025-07-23 15:17:01 +02:00
Alexey Masterov
bb6127f495 Update the comment 2025-07-23 15:16:02 +02:00
a-masterov
a41c00e7c1 Merge branch 'main' into amasterov/random-ops-add-snapshots 2025-07-23 15:15:27 +02:00
Alexey Masterov
76832488d0 Add a log with the new branch 2025-07-23 15:09:02 +02:00
Alexey Masterov
3ce2c15c10 Cleanup 2025-07-23 14:30:55 +02:00
Alexey Masterov
1c3f49e231 fix restored_from 2025-07-23 12:57:15 +02:00
Alexey Masterov
b982cf6c84 format 2025-07-23 12:18:27 +02:00
Alexey Masterov
b1b23cdc8e Do not mark a leaf a branch without a parent 2025-07-23 12:10:56 +02:00
Alexey Masterov
556e9cb781 Merge branch 'main' into amasterov/random-ops-add-snapshots
# Conflicts:
#	test_runner/random_ops/test_random_ops.py
2025-07-23 10:58:56 +02:00
Alexey Masterov
8edea1dea3 Do not consider restored branches as leaf ones 2025-07-22 13:41:09 +02:00
Alexey Masterov
f5a553a8e5 Add debug 2025-07-22 11:38:09 +02:00
Alexey Masterov
7423c393c6 Remove parent_id from the restored branch 2025-07-22 11:04:08 +02:00
Alexey Masterov
c3a7158e62 Add debug 2025-07-22 10:43:28 +02:00
Alexey Masterov
848dcd7540 Add debug 2025-07-21 17:00:15 +02:00
Alexey Masterov
783dfe3cce refactoring 2025-07-21 16:05:56 +02:00
Alexey Masterov
cdc2ea110f Cleanup 2025-07-18 16:38:25 +02:00
Alexey Masterov
c7e1183da4 Cleanup 2025-07-18 16:37:39 +02:00
Alexey Masterov
6763925a4d Run all the operations 2025-07-18 16:32:07 +02:00
Alexey Masterov
3bcdbe30f1 Avoid to manipulate restored snapshots 2025-07-18 16:09:46 +02:00
Alexey Masterov
22975426b7 10x more wait 2025-07-18 15:17:29 +02:00
Alexey Masterov
31c6f66a49 Wait before delete 2025-07-18 15:08:45 +02:00
Alexey Masterov
287e01fdf9 retry more 2025-07-18 15:06:52 +02:00
Alexey Masterov
91c81cc5e5 refactor 2025-07-18 14:52:39 +02:00
Alexey Masterov
a8354b0aa3 Delete projects 2025-07-18 14:44:26 +02:00
Alexey Masterov
1102e2aff0 Add connect_env 2025-07-18 14:42:28 +02:00
Alexey Masterov
f6a61c9492 Add commit 2025-07-18 14:08:15 +02:00
Alexey Masterov
cbf8e248fc Do not delete project after failure (debug only, do not merge!) 2025-07-18 09:39:04 +02:00
Alexey Masterov
f0f30076cc Do not delete project after failure (debug only, do not merge!) 2025-07-18 09:35:26 +02:00
Alexey Masterov
42544cf145 Add debug 2025-07-17 19:57:20 +02:00
Alexey Masterov
28b25092ad An attempt 5 2025-07-17 19:49:49 +02:00
Alexey Masterov
b77a1fae04 An attempt 4 2025-07-17 18:58:00 +02:00
Alexey Masterov
73ed7ade70 An attempt 3 2025-07-17 18:53:09 +02:00
Alexey Masterov
74626b94a8 An attempt 2 2025-07-17 18:48:44 +02:00
Alexey Masterov
4ca6d8cecf An attempt 2025-07-17 18:32:57 +02:00
Alexey Masterov
bf0be50df9 Add debug 2025-07-17 15:06:53 +02:00
Alexey Masterov
1adc95758e add the database 2025-07-17 14:52:27 +02:00
Alexey Masterov
03e994f9c7 Connection parameters 2025-07-17 14:40:25 +02:00
Alexey Masterov
f0671c996e Debug 2025-07-17 14:27:09 +02:00
Alexey Masterov
829cb5fe59 use connection parameters instead of connect URI 2025-07-17 14:25:53 +02:00
Alexey Masterov
561083524d finalize restore by default 2025-07-17 13:58:09 +02:00
Alexey Masterov
009303e31f Connect to the target branch, not the main one 2025-07-17 13:44:19 +02:00
Alexey Masterov
0e42cac589 Add debug 2025-07-17 12:48:08 +02:00
Alexey Masterov
f5cebcaf6a Wait for the snapshot to complete 2025-07-17 12:34:43 +02:00
Alexey Masterov
5861d0f9b2 Add the environment 2025-07-17 12:01:50 +02:00
Alexey Masterov
dbedf11191 Add check for snapshot sanity 2025-07-17 11:50:30 +02:00
Alexey Masterov
1e20c4f2b2 format 2025-07-16 13:26:02 +02:00
Alexey Masterov
018f95115a Retry on 423 error "snapshot is in transition" 2025-07-16 13:21:32 +02:00
Alexey Masterov
f222256225 Added a documentation for the new methods 2025-07-15 17:07:07 +02:00
Alexey Masterov
17b5f5e090 Merge remote-tracking branch 'origin/amasterov/random-ops-add' into amasterov/random-ops-add 2025-07-15 16:35:48 +02:00
Alexey Masterov
9bf5d69c01 Cleanup 2025-07-15 16:35:16 +02:00
a-masterov
f816b3d90e Merge branch 'main' into amasterov/random-ops-add 2025-07-15 16:20:14 +02:00
Alexey Masterov
1ec1a82d3d Start benchmark 2025-07-15 15:16:45 +02:00
Alexey Masterov
e97c1d2684 Fix the parameter error 2025-07-15 14:49:27 +02:00
a-masterov
94cfd3f22e Merge branch 'main' into amasterov/random-ops-add 2025-07-15 12:09:32 +02:00
Alexey Masterov
f45ea8fe6b Add snapshots 2025-07-15 12:08:38 +02:00
Alexey Masterov
1443ba65d3 Add reset_to_parent 2025-07-15 12:08:38 +02:00
Alexey Masterov
185f4de0fe refactoring 2025-07-09 19:22:27 +02:00
Alexey Masterov
efb08f82cd Merge branch 'main' into amasterov/random-ops-add
# Conflicts:
#	test_runner/random_ops/test_random_ops.py
2025-07-09 18:59:43 +02:00
Alexey Masterov
c31563f551 formatting 2025-07-09 14:54:13 +02:00
Alexey Masterov
fd6c2cba01 Merge branch 'amasterov/workaround-branch-not-found-problem' into amasterov/random-ops-add
# Conflicts:
#	test_runner/random_ops/test_random_ops.py
2025-07-09 14:53:05 +02:00
Alexey Masterov
899f4a1e77 remove the duplicate log message 2025-07-09 13:58:13 +02:00
Alexey Masterov
e95fcfa0d5 Add retrying if parent branch is not found.
Add clarification and a link to Jira issue.
2025-07-09 13:54:54 +02:00
a-masterov
0ccc649299 Merge branch 'main' into amasterov/random-ops-add 2025-07-08 11:34:39 +02:00
Alexey Masterov
fe2abf3531 fix a mypy discovered error 2025-06-12 10:48:04 +02:00
86 changed files with 1365 additions and 327 deletions

View File

@@ -13,12 +13,6 @@ on:
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
# - cron: '0 5 * * *' # Runs at 5 UTC once a day
workflow_dispatch: # adds an ability to run this manually
inputs:
commit_hash:
type: string
description: 'The long neon repo commit hash for the system under test (proxy) to be tested.'
required: false
default: ''
defaults:
run:
@@ -39,14 +33,20 @@ jobs:
contents: write
pull-requests: write
runs-on: [ self-hosted, unit-perf-aws-arm ]
timeout-minutes: 60 # 1h timeout
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
timeout-minutes: 60 # 1h timeout
steps:
- name: Checkout proxy-bench Repo
uses: actions/checkout@v4
with:
repository: neondatabase/proxy-bench
path: proxy-bench
- name: Set up the environment which depends on $RUNNER_TEMP on nvme drive
id: set-env
shell: bash -euxo pipefail {0}
@@ -54,64 +54,19 @@ jobs:
PROXY_BENCH_PATH=$(realpath ./proxy-bench)
{
echo "PROXY_BENCH_PATH=$PROXY_BENCH_PATH"
echo "NEON_DIR=${GITHUB_WORKSPACE}/"
echo "NEON_PROXY_PATH=${GITHUB_WORKSPACE}/bin/proxy"
echo "NEON_DIR=${RUNNER_TEMP}/neon"
echo "NEON_PROXY_PATH=${RUNNER_TEMP}/neon/bin/proxy"
echo "TEST_OUTPUT=${PROXY_BENCH_PATH}/test_output"
echo "DOCKER_COMPOSE_FILE=${PROXY_BENCH_PATH}/docker-compose.yml"
echo ""
} >> "$GITHUB_ENV"
- name: Determine commit hash
id: commit_hash
shell: bash -euxo pipefail {0}
env:
INPUT_COMMIT_HASH: ${{ github.event.inputs.commit_hash }}
run: |
if [[ -z "${INPUT_COMMIT_HASH}" ]]; then
COMMIT_HASH=$(curl -s https://api.github.com/repos/neondatabase/neon/commits/main | jq -r '.sha')
echo "COMMIT_HASH=$COMMIT_HASH" >> $GITHUB_ENV
echo "commit_hash=$COMMIT_HASH" >> "$GITHUB_OUTPUT"
echo "COMMIT_HASH_TYPE=latest" >> $GITHUB_ENV
else
COMMIT_HASH="${INPUT_COMMIT_HASH}"
echo "COMMIT_HASH=$COMMIT_HASH" >> $GITHUB_ENV
echo "commit_hash=$COMMIT_HASH" >> "$GITHUB_OUTPUT"
echo "COMMIT_HASH_TYPE=manual" >> $GITHUB_ENV
fi
- name: Checkout the neon repository at given commit hash
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
ref: ${{ steps.commit_hash.outputs.commit_hash }}
- name: Print GITHUB_WORKSPACE
run: echo "$GITHUB_WORKSPACE"
- name: List parent dir of workspace
run: ls -la /__w/neon
- name: List all env vars
run: env
- name: Checkout proxy-bench Repo
uses: actions/checkout@v4
with:
repository: neondatabase/proxy-bench
path: proxy-bench
- name: Cache poetry deps
uses: actions/cache@v4
with:
path: ~/.cache/pypoetry/virtualenvs
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-bookworm-${{ hashFiles('poetry.lock') }}
- name: DEBUG List files for debugging
working-directory: ${{ env.NEON_DIR }}
run: |
pwd
ls -la
- name: Install Python deps
working-directory: ${{ env.NEON_DIR }}
shell: bash -euxo pipefail {0}
run: ./scripts/pysync
@@ -122,14 +77,14 @@ jobs:
- name: Run proxy-bench
working-directory: ${{ env.PROXY_BENCH_PATH }}
run: ./run.sh --bare-metal
run: ./run.sh --with-grafana --bare-metal
- name: Ingest Bench Results
if: always()
working-directory: ${{ env.NEON_DIR }}
run: |
mkdir -p $TEST_OUTPUT
python3 ./scripts/proxy_bench_results_ingest.py --out $TEST_OUTPUT
python $NEON_DIR/scripts/proxy_bench_results_ingest.py --out $TEST_OUTPUT
- name: Push Metrics to Proxy perf database
shell: bash -euxo pipefail {0}
@@ -154,4 +109,4 @@ jobs:
fi
if [[ -d "${PROXY_BENCH_PATH}/test_output" ]]; then
rm -rf ${PROXY_BENCH_PATH}/test_output
fi
fi

View File

@@ -1 +1 @@
SELECT num_requested AS checkpoints_req FROM pg_stat_checkpointer;
SELECT num_requested AS checkpoints_req FROM pg_catalog.pg_stat_checkpointer;

View File

@@ -1 +1 @@
SELECT checkpoints_req FROM pg_stat_bgwriter;
SELECT checkpoints_req FROM pg_catalog.pg_stat_bgwriter;

View File

@@ -1 +1 @@
SELECT checkpoints_timed FROM pg_stat_bgwriter;
SELECT checkpoints_timed FROM pg_catalog.pg_stat_bgwriter;

View File

@@ -1 +1 @@
SELECT (neon.backpressure_throttling_time()::float8 / 1000000) AS throttled;
SELECT (neon.backpressure_throttling_time()::pg_catalog.float8 / 1000000) AS throttled;

View File

@@ -1,4 +1,4 @@
SELECT CASE
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_last_wal_replay_lsn() - '0/0')::FLOAT8
ELSE (pg_current_wal_lsn() - '0/0')::FLOAT8
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_catalog.pg_last_wal_replay_lsn() - '0/0')::pg_catalog.FLOAT8
ELSE (pg_catalog.pg_current_wal_lsn() - '0/0')::pg_catalog.FLOAT8
END AS lsn;

View File

@@ -1,7 +1,7 @@
SELECT
(SELECT setting FROM pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
(SELECT setting FROM pg_catalog.pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
-- These temporary snapshot files are renamed to the actual snapshot files
-- after they are completely built. We only WAL-log the completely built
-- snapshot files
(SELECT COUNT(*) FROM pg_ls_dir('pg_logical/snapshots') AS name WHERE name LIKE '%.snap') AS num_logical_snapshot_files;
(SELECT COUNT(*) FROM pg_catalog.pg_ls_dir('pg_logical/snapshots') AS name WHERE name LIKE '%.snap') AS num_logical_snapshot_files;

View File

@@ -1,7 +1,7 @@
SELECT
(SELECT current_setting('neon.timeline_id')) AS timeline_id,
(SELECT pg_catalog.current_setting('neon.timeline_id')) AS timeline_id,
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
-- These temporary snapshot files are renamed to the actual snapshot files
-- after they are completely built. We only WAL-log the completely built
-- snapshot files
(SELECT COALESCE(sum(size), 0) FROM pg_ls_logicalsnapdir() WHERE name LIKE '%.snap') AS logical_snapshots_bytes;
(SELECT COALESCE(pg_catalog.sum(size), 0) FROM pg_catalog.pg_ls_logicalsnapdir() WHERE name LIKE '%.snap') AS logical_snapshots_bytes;

View File

@@ -1,9 +1,9 @@
SELECT
(SELECT setting FROM pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
(SELECT setting FROM pg_catalog.pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
-- These temporary snapshot files are renamed to the actual snapshot files
-- after they are completely built. We only WAL-log the completely built
-- snapshot files
(SELECT COALESCE(sum((pg_stat_file('pg_logical/snapshots/' || name, missing_ok => true)).size), 0)
FROM (SELECT * FROM pg_ls_dir('pg_logical/snapshots') WHERE pg_ls_dir LIKE '%.snap') AS name
(SELECT COALESCE(pg_catalog.sum((pg_catalog.pg_stat_file('pg_logical/snapshots/' || name, missing_ok => true)).size), 0)
FROM (SELECT * FROM pg_catalog.pg_ls_dir('pg_logical/snapshots') WHERE pg_ls_dir LIKE '%.snap') AS name
) AS logical_snapshots_bytes;

View File

@@ -1 +1 @@
SELECT current_setting('max_connections') as max_connections;
SELECT pg_catalog.current_setting('max_connections') AS max_connections;

View File

@@ -1,4 +1,4 @@
SELECT datname database_name,
age(datfrozenxid) frozen_xid_age
FROM pg_database
pg_catalog.age(datfrozenxid) frozen_xid_age
FROM pg_catalog.pg_database
ORDER BY frozen_xid_age DESC LIMIT 10;

View File

@@ -1,4 +1,4 @@
SELECT datname database_name,
mxid_age(datminmxid) min_mxid_age
FROM pg_database
pg_catalog.mxid_age(datminmxid) min_mxid_age
FROM pg_catalog.pg_database
ORDER BY min_mxid_age DESC LIMIT 10;

View File

@@ -1,4 +1,4 @@
SELECT CASE
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_last_wal_receive_lsn() - '0/0')::FLOAT8
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_catalog.pg_last_wal_receive_lsn() - '0/0')::pg_catalog.FLOAT8
ELSE 0
END AS lsn;

View File

@@ -1 +1 @@
SELECT subenabled::text AS enabled, count(*) AS subscriptions_count FROM pg_subscription GROUP BY subenabled;
SELECT subenabled::pg_catalog.text AS enabled, pg_catalog.count(*) AS subscriptions_count FROM pg_catalog.pg_subscription GROUP BY subenabled;

View File

@@ -1 +1 @@
SELECT datname, state, count(*) AS count FROM pg_stat_activity WHERE state <> '' GROUP BY datname, state;
SELECT datname, state, pg_catalog.count(*) AS count FROM pg_catalog.pg_stat_activity WHERE state <> '' GROUP BY datname, state;

View File

@@ -1,5 +1,5 @@
SELECT sum(pg_database_size(datname)) AS total
FROM pg_database
SELECT pg_catalog.sum(pg_catalog.pg_database_size(datname)) AS total
FROM pg_catalog.pg_database
-- Ignore invalid databases, as we will likely have problems with
-- getting their size from the Pageserver.
WHERE datconnlimit != -2;

View File

@@ -3,6 +3,6 @@
-- minutes.
SELECT
x::text as duration_seconds,
x::pg_catalog.text AS duration_seconds,
neon.approximate_working_set_size_seconds(x) AS size
FROM (SELECT generate_series * 60 AS x FROM generate_series(1, 60)) AS t (x);

View File

@@ -3,6 +3,6 @@
SELECT
x AS duration,
neon.approximate_working_set_size_seconds(extract('epoch' FROM x::interval)::int) AS size FROM (
neon.approximate_working_set_size_seconds(extract('epoch' FROM x::pg_catalog.interval)::pg_catalog.int4) AS size FROM (
VALUES ('5m'), ('15m'), ('1h')
) AS t (x);

View File

@@ -1 +1 @@
SELECT pg_size_bytes(current_setting('neon.file_cache_size_limit')) AS lfc_cache_size_limit;
SELECT pg_catalog.pg_size_bytes(pg_catalog.current_setting('neon.file_cache_size_limit')) AS lfc_cache_size_limit;

View File

@@ -1,3 +1,3 @@
SELECT slot_name, (restart_lsn - '0/0')::FLOAT8 as restart_lsn
FROM pg_replication_slots
SELECT slot_name, (restart_lsn - '0/0')::pg_catalog.FLOAT8 AS restart_lsn
FROM pg_catalog.pg_replication_slots
WHERE slot_type = 'logical';

View File

@@ -1 +1 @@
SELECT setting::int AS max_cluster_size FROM pg_settings WHERE name = 'neon.max_cluster_size';
SELECT setting::pg_catalog.int4 AS max_cluster_size FROM pg_catalog.pg_settings WHERE name = 'neon.max_cluster_size';

View File

@@ -1,13 +1,13 @@
-- We export stats for 10 non-system databases. Without this limit it is too
-- easy to abuse the system by creating lots of databases.
SELECT pg_database_size(datname) AS db_size,
SELECT pg_catalog.pg_database_size(datname) AS db_size,
deadlocks,
tup_inserted AS inserted,
tup_updated AS updated,
tup_deleted AS deleted,
datname
FROM pg_stat_database
FROM pg_catalog.pg_stat_database
WHERE datname IN (
SELECT datname FROM pg_database
-- Ignore invalid databases, as we will likely have problems with

View File

@@ -3,4 +3,4 @@
-- replay LSN may have advanced past the receive LSN we are using for the
-- calculation.
SELECT GREATEST(0, pg_wal_lsn_diff(pg_last_wal_receive_lsn(), pg_last_wal_replay_lsn())) AS replication_delay_bytes;
SELECT GREATEST(0, pg_catalog.pg_wal_lsn_diff(pg_catalog.pg_last_wal_receive_lsn(), pg_catalog.pg_last_wal_replay_lsn())) AS replication_delay_bytes;

View File

@@ -1,5 +1,5 @@
SELECT
CASE
WHEN pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn() THEN 0
ELSE GREATEST(0, EXTRACT (EPOCH FROM now() - pg_last_xact_replay_timestamp()))
WHEN pg_catalog.pg_last_wal_receive_lsn() = pg_catalog.pg_last_wal_replay_lsn() THEN 0
ELSE GREATEST(0, EXTRACT (EPOCH FROM pg_catalog.now() - pg_catalog.pg_last_xact_replay_timestamp()))
END AS replication_delay_seconds;

View File

@@ -1,10 +1,10 @@
SELECT
slot_name,
pg_wal_lsn_diff(
pg_catalog.pg_wal_lsn_diff(
CASE
WHEN pg_is_in_recovery() THEN pg_last_wal_replay_lsn()
ELSE pg_current_wal_lsn()
WHEN pg_catalog.pg_is_in_recovery() THEN pg_catalog.pg_last_wal_replay_lsn()
ELSE pg_catalog.pg_current_wal_lsn()
END,
restart_lsn)::FLOAT8 AS retained_wal
FROM pg_replication_slots
restart_lsn)::pg_catalog.FLOAT8 AS retained_wal
FROM pg_catalog.pg_replication_slots
WHERE active = false;

View File

@@ -4,4 +4,4 @@ SELECT
WHEN wal_status = 'lost' THEN 1
ELSE 0
END AS wal_is_lost
FROM pg_replication_slots;
FROM pg_catalog.pg_replication_slots;

View File

@@ -279,7 +279,7 @@ fn main() -> Result<()> {
config,
)?;
let exit_code = compute_node.run()?;
let exit_code = compute_node.run().context("running compute node")?;
scenario.teardown();

View File

@@ -24,9 +24,9 @@ pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
});
let query = "
INSERT INTO health_check VALUES (1, now())
INSERT INTO public.health_check VALUES (1, pg_catalog.now())
ON CONFLICT (id) DO UPDATE
SET updated_at = now();";
SET updated_at = pg_catalog.now();";
match client.simple_query(query).await {
Result::Ok(result) => {

View File

@@ -32,8 +32,12 @@ use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::time::{Duration, Instant};
use std::{env, fs};
use tokio::{spawn, sync::watch, task::JoinHandle, time};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, error, info, instrument, warn};
use url::Url;
use utils::backoff::{
DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff_duration,
};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::measured_stream::MeasuredReader;
@@ -192,6 +196,7 @@ pub struct ComputeState {
pub startup_span: Option<tracing::span::Span>,
pub lfc_prewarm_state: LfcPrewarmState,
pub lfc_prewarm_token: CancellationToken,
pub lfc_offload_state: LfcOffloadState,
/// WAL flush LSN that is set after terminating Postgres and syncing safekeepers if
@@ -217,6 +222,7 @@ impl ComputeState {
lfc_offload_state: LfcOffloadState::default(),
terminate_flush_lsn: None,
promote_state: None,
lfc_prewarm_token: CancellationToken::new(),
}
}
@@ -583,7 +589,7 @@ impl ComputeNode {
// that can affect `compute_ctl` and prevent it from properly configuring the database schema.
// Unset them via connection string options before connecting to the database.
// N.B. keep it in sync with `ZENITH_OPTIONS` in `get_maintenance_client()`.
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0 -c pgaudit.log=none";
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path='' -c statement_timeout=0 -c pgaudit.log=none";
let options = match conn_conf.get_options() {
// Allow the control plane to override any options set by the
// compute
@@ -1554,6 +1560,41 @@ impl ComputeNode {
Ok(lsn)
}
fn sync_safekeepers_with_retries(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
let max_retries = 5;
let mut attempts = 0;
loop {
let result = self.sync_safekeepers(storage_auth_token.clone());
match &result {
Ok(_) => {
if attempts > 0 {
tracing::info!("sync_safekeepers succeeded after {attempts} retries");
}
return result;
}
Err(e) if attempts < max_retries => {
tracing::info!(
"sync_safekeepers failed, will retry (attempt {attempts}): {e:#}"
);
}
Err(err) => {
tracing::warn!(
"sync_safekeepers still failed after {attempts} retries, giving up: {err:?}"
);
return result;
}
}
// sleep and retry
let backoff = exponential_backoff_duration(
attempts,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
);
std::thread::sleep(backoff);
attempts += 1;
}
}
/// Do all the preparations like PGDATA directory creation, configuration,
/// safekeepers sync, basebackup, etc.
#[instrument(skip_all)]
@@ -1589,7 +1630,7 @@ impl ComputeNode {
lsn
} else {
info!("starting safekeepers syncing");
self.sync_safekeepers(pspec.storage_auth_token.clone())
self.sync_safekeepers_with_retries(pspec.storage_auth_token.clone())
.with_context(|| "failed to sync safekeepers")?
};
info!("safekeepers synced at LSN {}", lsn);
@@ -1884,7 +1925,7 @@ impl ComputeNode {
// It doesn't matter what were the options before, here we just want
// to connect and create a new superuser role.
const ZENITH_OPTIONS: &str = "-c role=zenith_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
const ZENITH_OPTIONS: &str = "-c role=zenith_admin -c default_transaction_read_only=off -c search_path='' -c statement_timeout=0";
zenith_admin_conf.options(ZENITH_OPTIONS);
let mut client =
@@ -2339,13 +2380,13 @@ impl ComputeNode {
let result = client
.simple_query(
"SELECT
row_to_json(pg_stat_statements)
pg_catalog.row_to_json(pss)
FROM
pg_stat_statements
public.pg_stat_statements pss
WHERE
userid != 'cloud_admin'::regrole::oid
pss.userid != 'cloud_admin'::pg_catalog.regrole::pg_catalog.oid
ORDER BY
(mean_exec_time + mean_plan_time) DESC
(pss.mean_exec_time + pss.mean_plan_time) DESC
LIMIT 100",
)
.await;
@@ -2473,11 +2514,11 @@ LIMIT 100",
// check the role grants first - to gracefully handle read-replicas.
let select = "SELECT privilege_type
FROM pg_namespace
JOIN LATERAL (SELECT * FROM aclexplode(nspacl) AS x) acl ON true
JOIN pg_user users ON acl.grantee = users.usesysid
WHERE users.usename = $1
AND nspname = $2";
FROM pg_catalog.pg_namespace
JOIN LATERAL (SELECT * FROM aclexplode(nspacl) AS x) AS acl ON true
JOIN pg_catalog.pg_user users ON acl.grantee = users.usesysid
WHERE users.usename OPERATOR(pg_catalog.=) $1::pg_catalog.name
AND nspname OPERATOR(pg_catalog.=) $2::pg_catalog.name";
let rows = db_client
.query(select, &[role_name, schema_name])
.await
@@ -2546,8 +2587,9 @@ LIMIT 100",
.await
.with_context(|| format!("Failed to execute query: {query}"))?;
} else {
let query =
format!("CREATE EXTENSION IF NOT EXISTS {ext_name} WITH VERSION {quoted_version}");
let query = format!(
"CREATE EXTENSION IF NOT EXISTS {ext_name} WITH SCHEMA public VERSION {quoted_version}"
);
db_client
.simple_query(&query)
.await

View File

@@ -7,7 +7,8 @@ use http::StatusCode;
use reqwest::Client;
use std::mem::replace;
use std::sync::Arc;
use tokio::{io::AsyncReadExt, spawn};
use tokio::{io::AsyncReadExt, select, spawn};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
#[derive(serde::Serialize, Default)]
@@ -92,34 +93,35 @@ impl ComputeNode {
/// If there is a prewarm request ongoing, return `false`, `true` otherwise.
/// Has a failpoint "compute-prewarm"
pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
let token: CancellationToken;
{
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
if let LfcPrewarmState::Prewarming = replace(state, LfcPrewarmState::Prewarming) {
let state = &mut self.state.lock().unwrap();
token = state.lfc_prewarm_token.clone();
if let LfcPrewarmState::Prewarming =
replace(&mut state.lfc_prewarm_state, LfcPrewarmState::Prewarming)
{
return false;
}
}
crate::metrics::LFC_PREWARMS.inc();
let cloned = self.clone();
let this = self.clone();
spawn(async move {
let state = match cloned.prewarm_impl(from_endpoint).await {
Ok(true) => LfcPrewarmState::Completed,
Ok(false) => {
info!(
"skipping LFC prewarm because LFC state is not found in endpoint storage"
);
LfcPrewarmState::Skipped
}
let prewarm_state = match this.prewarm_impl(from_endpoint, token).await {
Ok(state) => state,
Err(err) => {
crate::metrics::LFC_PREWARM_ERRORS.inc();
error!(%err, "could not prewarm LFC");
LfcPrewarmState::Failed {
error: format!("{err:#}"),
}
let error = format!("{err:#}");
LfcPrewarmState::Failed { error }
}
};
cloned.state.lock().unwrap().lfc_prewarm_state = state;
let state = &mut this.state.lock().unwrap();
if let LfcPrewarmState::Cancelled = prewarm_state {
state.lfc_prewarm_token = CancellationToken::new();
}
state.lfc_prewarm_state = prewarm_state;
});
true
}
@@ -132,47 +134,70 @@ impl ComputeNode {
/// Request LFC state from endpoint storage and load corresponding pages into Postgres.
/// Returns a result with `false` if the LFC state is not found in endpoint storage.
async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<bool> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?;
async fn prewarm_impl(
&self,
from_endpoint: Option<String>,
token: CancellationToken,
) -> Result<LfcPrewarmState> {
let EndpointStoragePair {
url,
token: storage_token,
} = self.endpoint_storage_pair(from_endpoint)?;
#[cfg(feature = "testing")]
fail::fail_point!("compute-prewarm", |_| {
bail!("prewarm configured to fail because of a failpoint")
});
fail::fail_point!("compute-prewarm", |_| bail!("compute-prewarm failpoint"));
info!(%url, "requesting LFC state from endpoint storage");
let request = Client::new().get(&url).bearer_auth(token);
let res = request.send().await.context("querying endpoint storage")?;
match res.status() {
let request = Client::new().get(&url).bearer_auth(storage_token);
let response = select! {
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
response = request.send() => response
}
.context("querying endpoint storage")?;
match response.status() {
StatusCode::OK => (),
StatusCode::NOT_FOUND => {
return Ok(false);
}
StatusCode::NOT_FOUND => return Ok(LfcPrewarmState::Skipped),
status => bail!("{status} querying endpoint storage"),
}
let mut uncompressed = Vec::new();
let lfc_state = res
.bytes()
.await
.context("getting request body from endpoint storage")?;
ZstdDecoder::new(lfc_state.iter().as_slice())
.read_to_end(&mut uncompressed)
.await
.context("decoding LFC state")?;
let lfc_state = select! {
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
lfc_state = response.bytes() => lfc_state
}
.context("getting request body from endpoint storage")?;
let mut decoder = ZstdDecoder::new(lfc_state.iter().as_slice());
select! {
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
read = decoder.read_to_end(&mut uncompressed) => read
}
.context("decoding LFC state")?;
let uncompressed_len = uncompressed.len();
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}");
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into Postgres");
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
// Client connection and prewarm info querying are fast and therefore don't need
// cancellation
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
.query_one("select neon.prewarm_local_cache($1)", &[&uncompressed])
.await
.context("loading LFC state into postgres")
.map(|_| ())?;
.context("connecting to postgres")?;
let pg_token = client.cancel_token();
Ok(true)
let params: Vec<&(dyn postgres_types::ToSql + Sync)> = vec![&uncompressed];
select! {
res = client.query_one("select neon.prewarm_local_cache($1)", &params) => res,
_ = token.cancelled() => {
pg_token.cancel_query(postgres::NoTls).await
.context("cancelling neon.prewarm_local_cache()")?;
return Ok(LfcPrewarmState::Cancelled)
}
}
.context("loading LFC state into postgres")
.map(|_| ())?;
Ok(LfcPrewarmState::Completed)
}
/// If offload request is ongoing, return false, true otherwise
@@ -200,20 +225,20 @@ impl ComputeNode {
async fn offload_lfc_with_state_update(&self) {
crate::metrics::LFC_OFFLOADS.inc();
let Err(err) = self.offload_lfc_impl().await else {
self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed;
return;
let state = match self.offload_lfc_impl().await {
Ok(state) => state,
Err(err) => {
crate::metrics::LFC_OFFLOAD_ERRORS.inc();
error!(%err, "could not offload LFC");
let error = format!("{err:#}");
LfcOffloadState::Failed { error }
}
};
crate::metrics::LFC_OFFLOAD_ERRORS.inc();
error!(%err, "could not offload LFC state to endpoint storage");
self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
error: format!("{err:#}"),
};
self.state.lock().unwrap().lfc_offload_state = state;
}
async fn offload_lfc_impl(&self) -> Result<()> {
async fn offload_lfc_impl(&self) -> Result<LfcOffloadState> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
info!(%url, "requesting LFC state from Postgres");
@@ -228,7 +253,7 @@ impl ComputeNode {
.context("deserializing LFC state")?;
let Some(state) = state else {
info!(%url, "empty LFC state, not exporting");
return Ok(());
return Ok(LfcOffloadState::Skipped);
};
let mut compressed = Vec::new();
@@ -242,7 +267,7 @@ impl ComputeNode {
let request = Client::new().put(url).bearer_auth(token).body(compressed);
match request.send().await {
Ok(res) if res.status() == StatusCode::OK => Ok(()),
Ok(res) if res.status() == StatusCode::OK => Ok(LfcOffloadState::Completed),
Ok(res) => bail!(
"Request to endpoint storage failed with status: {}",
res.status()
@@ -250,4 +275,8 @@ impl ComputeNode {
Err(err) => Err(err).context("writing to endpoint storage"),
}
}
pub fn cancel_prewarm(self: &Arc<Self>) {
self.state.lock().unwrap().lfc_prewarm_token.cancel();
}
}

View File

@@ -78,7 +78,7 @@ impl ComputeNode {
const RETRIES: i32 = 20;
for i in 0..=RETRIES {
let row = client
.query_one("SELECT pg_last_wal_replay_lsn()", &[])
.query_one("SELECT pg_catalog.pg_last_wal_replay_lsn()", &[])
.await
.context("getting last replay lsn")?;
let lsn: u64 = row.get::<usize, postgres_types::PgLsn>(0).into();
@@ -103,7 +103,7 @@ impl ComputeNode {
.await
.context("setting safekeepers")?;
client
.query("SELECT pg_reload_conf()", &[])
.query("SELECT pg_catalog.pg_reload_conf()", &[])
.await
.context("reloading postgres config")?;
@@ -113,7 +113,7 @@ impl ComputeNode {
});
let row = client
.query_one("SELECT * FROM pg_promote()", &[])
.query_one("SELECT * FROM pg_catalog.pg_promote()", &[])
.await
.context("pg_promote")?;
if !row.get::<usize, bool>(0) {

View File

@@ -139,6 +139,15 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/LfcPrewarmState"
delete:
tags:
- Prewarm
summary: Cancel ongoing LFC prewarm
description: ""
operationId: cancelLfcPrewarm
responses:
202:
description: Prewarm cancelled
/lfc/offload:
post:
@@ -636,7 +645,7 @@ components:
properties:
status:
description: LFC offload status
enum: [not_offloaded, offloading, completed, failed]
enum: [not_offloaded, offloading, completed, skipped, failed]
type: string
error:
description: LFC offload error, if any

View File

@@ -46,3 +46,8 @@ pub(in crate::http) async fn offload(compute: Compute) -> Response {
)
}
}
pub(in crate::http) async fn cancel_prewarm(compute: Compute) -> StatusCode {
compute.cancel_prewarm();
StatusCode::ACCEPTED
}

View File

@@ -99,7 +99,12 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
);
let authenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm))
.route(
"/lfc/prewarm",
get(lfc::prewarm_state)
.post(lfc::prewarm)
.delete(lfc::cancel_prewarm),
)
.route("/lfc/offload", get(lfc::offload_state).post(lfc::offload))
.route("/promote", post(promote::promote))
.route("/check_writability", post(check_writability::is_writable))

View File

@@ -19,7 +19,7 @@ async fn list_dbs(client: &mut Client) -> Result<Vec<String>, PostgresError> {
.query(
"SELECT datname FROM pg_catalog.pg_database
WHERE datallowconn
AND datconnlimit <> - 2
AND datconnlimit OPERATOR(pg_catalog.<>) (OPERATOR(pg_catalog.-) 2::pg_catalog.int4)
LIMIT 500",
&[],
)
@@ -67,7 +67,7 @@ pub async fn get_installed_extensions(
let extensions: Vec<(String, String, i32)> = client
.query(
"SELECT extname, extversion, extowner::integer FROM pg_catalog.pg_extension",
"SELECT extname, extversion, extowner::pg_catalog.int4 FROM pg_catalog.pg_extension",
&[],
)
.await?

View File

@@ -76,7 +76,7 @@ impl<'m> MigrationRunner<'m> {
self.client
.simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")
.await?;
self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)").await?;
self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key pg_catalog.int4 NOT NULL PRIMARY KEY, id pg_catalog.int8 NOT NULL DEFAULT 0)").await?;
self.client
.simple_query(
"INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",

View File

@@ -15,17 +15,17 @@ DO $$
DECLARE
role_name text;
BEGIN
FOR role_name IN SELECT rolname FROM pg_roles WHERE pg_has_role(rolname, '{privileged_role_name}', 'member')
FOR role_name IN SELECT rolname FROM pg_catalog.pg_roles WHERE pg_catalog.pg_has_role(rolname, '{privileged_role_name}', 'member')
LOOP
RAISE NOTICE 'EXECUTING ALTER ROLE % INHERIT', quote_ident(role_name);
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' INHERIT';
RAISE NOTICE 'EXECUTING ALTER ROLE % INHERIT', pg_catalog.quote_ident(role_name);
EXECUTE pg_catalog.format('ALTER ROLE %I INHERIT;', role_name);
END LOOP;
FOR role_name IN SELECT rolname FROM pg_roles
FOR role_name IN SELECT rolname FROM pg_catalog.pg_roles
WHERE
NOT pg_has_role(rolname, '{privileged_role_name}', 'member') AND NOT starts_with(rolname, 'pg_')
NOT pg_catalog.pg_has_role(rolname, '{privileged_role_name}', 'member') AND NOT pg_catalog.starts_with(rolname, 'pg_')
LOOP
RAISE NOTICE 'EXECUTING ALTER ROLE % NOBYPASSRLS', quote_ident(role_name);
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' NOBYPASSRLS';
RAISE NOTICE 'EXECUTING ALTER ROLE % NOBYPASSRLS', pg_catalog.quote_ident(role_name);
EXECUTE pg_catalog.format('ALTER ROLE %I NOBYPASSRLS;', role_name);
END LOOP;
END $$;

View File

@@ -1,6 +1,6 @@
DO $$
BEGIN
IF (SELECT setting::numeric >= 160000 FROM pg_settings WHERE name = 'server_version_num') THEN
IF (SELECT setting::pg_catalog.numeric >= 160000 FROM pg_catalog.pg_settings WHERE name = 'server_version_num') THEN
EXECUTE 'GRANT pg_create_subscription TO {privileged_role_name}';
END IF;
END $$;

View File

@@ -5,9 +5,9 @@ DO $$
DECLARE
role_name TEXT;
BEGIN
FOR role_name IN SELECT rolname FROM pg_roles WHERE rolreplication IS TRUE
FOR role_name IN SELECT rolname FROM pg_catalog.pg_roles WHERE rolreplication IS TRUE
LOOP
RAISE NOTICE 'EXECUTING ALTER ROLE % NOREPLICATION', quote_ident(role_name);
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' NOREPLICATION';
RAISE NOTICE 'EXECUTING ALTER ROLE % NOREPLICATION', pg_catalog.quote_ident(role_name);
EXECUTE pg_catalog.format('ALTER ROLE %I NOREPLICATION;', role_name);
END LOOP;
END $$;

View File

@@ -1,6 +1,6 @@
DO $$
BEGIN
IF (SELECT setting::numeric >= 160000 FROM pg_settings WHERE name = 'server_version_num') THEN
IF (SELECT setting::pg_catalog.numeric >= 160000 FROM pg_catalog.pg_settings WHERE name OPERATOR(pg_catalog.=) 'server_version_num'::pg_catalog.text) THEN
EXECUTE 'GRANT EXECUTE ON FUNCTION pg_export_snapshot TO {privileged_role_name}';
EXECUTE 'GRANT EXECUTE ON FUNCTION pg_log_standby_snapshot TO {privileged_role_name}';
END IF;

View File

@@ -2,7 +2,7 @@ DO $$
DECLARE
bypassrls boolean;
BEGIN
SELECT rolbypassrls INTO bypassrls FROM pg_roles WHERE rolname = 'neon_superuser';
SELECT rolbypassrls INTO bypassrls FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser';
IF NOT bypassrls THEN
RAISE EXCEPTION 'neon_superuser cannot bypass RLS';
END IF;

View File

@@ -4,8 +4,8 @@ DECLARE
BEGIN
FOR role IN
SELECT rolname AS name, rolinherit AS inherit
FROM pg_roles
WHERE pg_has_role(rolname, 'neon_superuser', 'member')
FROM pg_catalog.pg_roles
WHERE pg_catalog.pg_has_role(rolname, 'neon_superuser', 'member')
LOOP
IF NOT role.inherit THEN
RAISE EXCEPTION '% cannot inherit', quote_ident(role.name);
@@ -14,12 +14,12 @@ BEGIN
FOR role IN
SELECT rolname AS name, rolbypassrls AS bypassrls
FROM pg_roles
WHERE NOT pg_has_role(rolname, 'neon_superuser', 'member')
AND NOT starts_with(rolname, 'pg_')
FROM pg_catalog.pg_roles
WHERE NOT pg_catalog.pg_has_role(rolname, 'neon_superuser', 'member')
AND NOT pg_catalog.starts_with(rolname, 'pg_')
LOOP
IF role.bypassrls THEN
RAISE EXCEPTION '% can bypass RLS', quote_ident(role.name);
RAISE EXCEPTION '% can bypass RLS', pg_catalog.quote_ident(role.name);
END IF;
END LOOP;
END $$;

View File

@@ -1,10 +1,10 @@
DO $$
BEGIN
IF (SELECT current_setting('server_version_num')::numeric < 160000) THEN
IF (SELECT pg_catalog.current_setting('server_version_num')::pg_catalog.numeric < 160000) THEN
RETURN;
END IF;
IF NOT (SELECT pg_has_role('neon_superuser', 'pg_create_subscription', 'member')) THEN
IF NOT (SELECT pg_catalog.pg_has_role('neon_superuser', 'pg_create_subscription', 'member')) THEN
RAISE EXCEPTION 'neon_superuser cannot execute pg_create_subscription';
END IF;
END $$;

View File

@@ -2,12 +2,12 @@ DO $$
DECLARE
monitor record;
BEGIN
SELECT pg_has_role('neon_superuser', 'pg_monitor', 'member') AS member,
SELECT pg_catalog.pg_has_role('neon_superuser', 'pg_monitor', 'member') AS member,
admin_option AS admin
INTO monitor
FROM pg_auth_members
WHERE roleid = 'pg_monitor'::regrole
AND member = 'neon_superuser'::regrole;
FROM pg_catalog.pg_auth_members
WHERE roleid = 'pg_monitor'::pg_catalog.regrole
AND member = 'neon_superuser'::pg_catalog.regrole;
IF monitor IS NULL THEN
RAISE EXCEPTION 'no entry in pg_auth_members for neon_superuser and pg_monitor';

View File

@@ -2,11 +2,11 @@ DO $$
DECLARE
can_execute boolean;
BEGIN
SELECT bool_and(has_function_privilege('neon_superuser', oid, 'execute'))
SELECT pg_catalog.bool_and(pg_catalog.has_function_privilege('neon_superuser', oid, 'execute'))
INTO can_execute
FROM pg_proc
FROM pg_catalog.pg_proc
WHERE proname IN ('pg_export_snapshot', 'pg_log_standby_snapshot')
AND pronamespace = 'pg_catalog'::regnamespace;
AND pronamespace = 'pg_catalog'::pg_catalog.regnamespace;
IF NOT can_execute THEN
RAISE EXCEPTION 'neon_superuser cannot execute both pg_export_snapshot and pg_log_standby_snapshot';
END IF;

View File

@@ -2,9 +2,9 @@ DO $$
DECLARE
can_execute boolean;
BEGIN
SELECT has_function_privilege('neon_superuser', oid, 'execute')
SELECT pg_catalog.has_function_privilege('neon_superuser', oid, 'execute')
INTO can_execute
FROM pg_proc
FROM pg_catalog.pg_proc
WHERE proname = 'pg_show_replication_origin_status'
AND pronamespace = 'pg_catalog'::regnamespace;
IF NOT can_execute THEN

View File

@@ -2,10 +2,10 @@ DO $$
DECLARE
signal_backend record;
BEGIN
SELECT pg_has_role('neon_superuser', 'pg_signal_backend', 'member') AS member,
SELECT pg_catalog.pg_has_role('neon_superuser', 'pg_signal_backend', 'member') AS member,
admin_option AS admin
INTO signal_backend
FROM pg_auth_members
FROM pg_catalog.pg_auth_members
WHERE roleid = 'pg_signal_backend'::regrole
AND member = 'neon_superuser'::regrole;

View File

@@ -407,9 +407,9 @@ fn get_database_stats(cli: &mut Client) -> anyhow::Result<(f64, i64)> {
// like `postgres_exporter` use it to query Postgres statistics.
// Use explicit 8 bytes type casts to match Rust types.
let stats = cli.query_one(
"SELECT coalesce(sum(active_time), 0.0)::float8 AS total_active_time,
coalesce(sum(sessions), 0)::bigint AS total_sessions
FROM pg_stat_database
"SELECT pg_catalog.coalesce(pg_catalog.sum(active_time), 0.0)::pg_catalog.float8 AS total_active_time,
pg_catalog.coalesce(pg_catalog.sum(sessions), 0)::pg_catalog.bigint AS total_sessions
FROM pg_catalog.pg_stat_database
WHERE datname NOT IN (
'postgres',
'template0',
@@ -445,11 +445,11 @@ fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime
let mut last_active: Option<DateTime<Utc>> = None;
// Get all running client backends except ourself, use RFC3339 DateTime format.
let backends = cli.query(
"SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change
"SELECT state, pg_catalog.to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"'::pg_catalog.text) AS state_change
FROM pg_stat_activity
WHERE backend_type = 'client backend'
AND pid != pg_backend_pid()
AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
WHERE backend_type OPERATOR(pg_catalog.=) 'client backend'::pg_catalog.text
AND pid OPERATOR(pg_catalog.!=) pg_catalog.pg_backend_pid()
AND usename OPERATOR(pg_catalog.!=) 'cloud_admin'::pg_catalog.name;", // XXX: find a better way to filter other monitors?
&[],
);

View File

@@ -299,9 +299,9 @@ pub async fn get_existing_dbs_async(
.query_raw::<str, &String, &[String; 0]>(
"SELECT
datname AS name,
(SELECT rolname FROM pg_roles WHERE oid = datdba) AS owner,
(SELECT rolname FROM pg_catalog.pg_roles WHERE oid OPERATOR(pg_catalog.=) datdba) AS owner,
NOT datallowconn AS restrict_conn,
datconnlimit = - 2 AS invalid
datconnlimit OPERATOR(pg_catalog.=) (OPERATOR(pg_catalog.-) 2) AS invalid
FROM
pg_catalog.pg_database;",
&[],

View File

@@ -82,7 +82,7 @@ impl ComputeNode {
info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id);
drop_subscriptions_done = match
client.query("select 1 from neon.drop_subscriptions_done where timeline_id = $1", &[&timeline_id.to_string()]).await {
client.query("select 1 from neon.drop_subscriptions_done where timeline_id OPERATOR(pg_catalog.=) $1", &[&timeline_id.to_string()]).await {
Ok(result) => !result.is_empty(),
Err(e) =>
{
@@ -1142,7 +1142,9 @@ async fn get_operations<'a>(
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
if libs.contains("pg_stat_statements") {
return Ok(Box::new(once(Operation {
query: String::from("CREATE EXTENSION IF NOT EXISTS pg_stat_statements"),
query: String::from(
"CREATE EXTENSION IF NOT EXISTS pg_stat_statements WITH SCHEMA public",
),
comment: Some(String::from("create system extensions")),
})));
}
@@ -1150,11 +1152,13 @@ async fn get_operations<'a>(
Ok(Box::new(empty()))
}
ApplySpecPhase::CreatePgauditExtension => Ok(Box::new(once(Operation {
query: String::from("CREATE EXTENSION IF NOT EXISTS pgaudit"),
query: String::from("CREATE EXTENSION IF NOT EXISTS pgaudit WITH SCHEMA public"),
comment: Some(String::from("create pgaudit extensions")),
}))),
ApplySpecPhase::CreatePgauditlogtofileExtension => Ok(Box::new(once(Operation {
query: String::from("CREATE EXTENSION IF NOT EXISTS pgauditlogtofile"),
query: String::from(
"CREATE EXTENSION IF NOT EXISTS pgauditlogtofile WITH SCHEMA public",
),
comment: Some(String::from("create pgauditlogtofile extensions")),
}))),
// Disable pgaudit logging for postgres database.
@@ -1178,7 +1182,7 @@ async fn get_operations<'a>(
},
Operation {
query: String::from(
"UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'",
"UPDATE pg_catalog.pg_extension SET extrelocatable = true WHERE extname OPERATOR(pg_catalog.=) 'neon'::pg_catalog.name AND extrelocatable OPERATOR(pg_catalog.=) false",
),
comment: Some(String::from("compat/fix: make neon relocatable")),
},

View File

@@ -3,16 +3,17 @@ BEGIN
IF NOT EXISTS(
SELECT 1
FROM pg_catalog.pg_tables
WHERE tablename = 'health_check'
WHERE tablename::pg_catalog.name OPERATOR(pg_catalog.=) 'health_check'::pg_catalog.name
AND schemaname::pg_catalog.name OPERATOR(pg_catalog.=) 'public'::pg_catalog.name
)
THEN
CREATE TABLE health_check (
id serial primary key,
updated_at timestamptz default now()
CREATE TABLE public.health_check (
id pg_catalog.int4 primary key generated by default as identity,
updated_at pg_catalog.timestamptz default pg_catalog.now()
);
INSERT INTO health_check VALUES (1, now())
INSERT INTO public.health_check VALUES (1, pg_catalog.now())
ON CONFLICT (id) DO UPDATE
SET updated_at = now();
SET updated_at = pg_catalog.now();
END IF;
END
$$

View File

@@ -2,10 +2,11 @@ DO $$
DECLARE
query varchar;
BEGIN
FOR query IN SELECT 'ALTER FUNCTION '||nsp.nspname||'.'||p.proname||'('||pg_get_function_identity_arguments(p.oid)||') OWNER TO {db_owner};'
FROM pg_proc p
JOIN pg_namespace nsp ON p.pronamespace = nsp.oid
WHERE nsp.nspname = 'anon' LOOP
FOR query IN
SELECT pg_catalog.format('ALTER FUNCTION %I(%s) OWNER TO {db_owner};', p.oid::regproc, pg_catalog.pg_get_function_identity_arguments(p.oid))
FROM pg_catalog.pg_proc p
WHERE p.pronamespace OPERATOR(pg_catalog.=) 'anon'::regnamespace::oid
LOOP
EXECUTE query;
END LOOP;
END

View File

@@ -1,6 +1,6 @@
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = '{privileged_role_name}')
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname OPERATOR(pg_catalog.=) '{privileged_role_name}'::pg_catalog.name)
THEN
CREATE ROLE {privileged_role_name} {privileges} IN ROLE pg_read_all_data, pg_write_all_data;
END IF;

View File

@@ -4,14 +4,14 @@ $$
IF EXISTS(
SELECT nspname
FROM pg_catalog.pg_namespace
WHERE nspname = 'public'
WHERE nspname OPERATOR(pg_catalog.=) 'public'
) AND
current_setting('server_version_num')::int / 10000 >= 15
pg_catalog.current_setting('server_version_num')::int OPERATOR(pg_catalog./) 10000 OPERATOR(pg_catalog.>=) 15
THEN
IF EXISTS(
SELECT rolname
FROM pg_catalog.pg_roles
WHERE rolname = 'web_access'
WHERE rolname OPERATOR(pg_catalog.=) 'web_access'
)
THEN
GRANT CREATE ON SCHEMA public TO web_access;
@@ -20,7 +20,7 @@ $$
IF EXISTS(
SELECT nspname
FROM pg_catalog.pg_namespace
WHERE nspname = 'public'
WHERE nspname OPERATOR(pg_catalog.=) 'public'
)
THEN
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser WITH GRANT OPTION;

View File

@@ -2,11 +2,17 @@ DO ${outer_tag}$
DECLARE
subname TEXT;
BEGIN
LOCK TABLE pg_subscription IN ACCESS EXCLUSIVE MODE;
FOR subname IN SELECT pg_subscription.subname FROM pg_subscription WHERE subdbid = (SELECT oid FROM pg_database WHERE datname = {datname_str}) LOOP
EXECUTE format('ALTER SUBSCRIPTION %I DISABLE;', subname);
EXECUTE format('ALTER SUBSCRIPTION %I SET (slot_name = NONE);', subname);
EXECUTE format('DROP SUBSCRIPTION %I;', subname);
LOCK TABLE pg_catalog.pg_subscription IN ACCESS EXCLUSIVE MODE;
FOR subname IN
SELECT pg_subscription.subname
FROM pg_catalog.pg_subscription
WHERE subdbid OPERATOR(pg_catalog.=) (
SELECT oid FROM pg_database WHERE datname OPERATOR(pg_catalog.=) {datname_str}::pg_catalog.name
)
LOOP
EXECUTE pg_catalog.format('ALTER SUBSCRIPTION %I DISABLE;', subname);
EXECUTE pg_catalog.format('ALTER SUBSCRIPTION %I SET (slot_name = NONE);', subname);
EXECUTE pg_catalog.format('DROP SUBSCRIPTION %I;', subname);
END LOOP;
END;
${outer_tag}$;

View File

@@ -3,19 +3,19 @@ BEGIN
IF NOT EXISTS(
SELECT 1
FROM pg_catalog.pg_tables
WHERE tablename = 'drop_subscriptions_done'
AND schemaname = 'neon'
WHERE tablename OPERATOR(pg_catalog.=) 'drop_subscriptions_done'::pg_catalog.name
AND schemaname OPERATOR(pg_catalog.=) 'neon'::pg_catalog.name
)
THEN
CREATE TABLE neon.drop_subscriptions_done
(id serial primary key, timeline_id text);
(id pg_catalog.int4 primary key generated by default as identity, timeline_id pg_catalog.text);
END IF;
-- preserve the timeline_id of the last drop_subscriptions run
-- to ensure that the cleanup of a timeline is executed only once.
-- use upsert to avoid the table bloat in case of cascade branching (branch of a branch)
INSERT INTO neon.drop_subscriptions_done VALUES (1, current_setting('neon.timeline_id'))
INSERT INTO neon.drop_subscriptions_done VALUES (1, pg_catalog.current_setting('neon.timeline_id'))
ON CONFLICT (id) DO UPDATE
SET timeline_id = current_setting('neon.timeline_id');
SET timeline_id = pg_catalog.current_setting('neon.timeline_id')::pg_catalog.text;
END
$$

View File

@@ -15,15 +15,15 @@ BEGIN
WHERE schema_name IN ('public')
LOOP
FOR grantor IN EXECUTE
format(
'SELECT DISTINCT rtg.grantor FROM information_schema.role_table_grants AS rtg WHERE grantee = %s',
pg_catalog.format(
'SELECT DISTINCT rtg.grantor FROM information_schema.role_table_grants AS rtg WHERE grantee OPERATOR(pg_catalog.=) %s',
-- N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`
quote_literal({role_name})
)
LOOP
EXECUTE format('SET LOCAL ROLE %I', grantor);
EXECUTE pg_catalog.format('SET LOCAL ROLE %I', grantor);
revoke_query := format(
revoke_query := pg_catalog.format(
'REVOKE ALL PRIVILEGES ON ALL TABLES IN SCHEMA %I FROM %I GRANTED BY %I',
schema,
-- N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`

View File

@@ -5,17 +5,17 @@ DO ${outer_tag}$
IF EXISTS(
SELECT nspname
FROM pg_catalog.pg_namespace
WHERE nspname = 'public'
WHERE nspname OPERATOR(pg_catalog.=) 'public'::pg_catalog.name
)
THEN
SELECT nspowner::regrole::text
FROM pg_catalog.pg_namespace
WHERE nspname = 'public'
WHERE nspname OPERATOR(pg_catalog.=) 'public'::pg_catalog.text
INTO schema_owner;
IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'
IF schema_owner OPERATOR(pg_catalog.=) 'cloud_admin'::pg_catalog.text OR schema_owner OPERATOR(pg_catalog.=) 'zenith_admin'::pg_catalog.text
THEN
EXECUTE format('ALTER SCHEMA public OWNER TO %I', {db_owner});
EXECUTE pg_catalog.format('ALTER SCHEMA public OWNER TO %I', {db_owner});
END IF;
END IF;
END

View File

@@ -3,10 +3,10 @@ DO ${outer_tag}$
IF EXISTS(
SELECT 1
FROM pg_catalog.pg_database
WHERE datname = {datname}
WHERE datname OPERATOR(pg_catalog.=) {datname}::pg_catalog.name
)
THEN
EXECUTE format('ALTER DATABASE %I is_template false', {datname});
EXECUTE pg_catalog.format('ALTER DATABASE %I is_template false', {datname});
END IF;
END
${outer_tag}$;

View File

@@ -0,0 +1,246 @@
# Node deletion API improvement
Created on 2025-07-07
Implemented on _TBD_
## Summary
This RFC describes improvements to the storage controller API for gracefully deleting pageserver
nodes.
## Motivation
The basic node deletion API introduced in [#8226](https://github.com/neondatabase/neon/issues/8333)
has several limitations:
- Deleted nodes can re-add themselves if they restart (e.g., a flaky node that keeps restarting and
we cannot reach via SSH to stop the pageserver). This issue has been resolved by tombstone
mechanism in [#12036](https://github.com/neondatabase/neon/issues/12036)
- Process of node deletion is not graceful, i.e. it just imitates a node failure
In this context, "graceful" node deletion means that users do not experience any disruption or
negative effects, provided the system remains in a healthy state (i.e., the remaining pageservers
can handle the workload and all requirements are met). To achieve this, the system must perform
live migration of all tenant shards from the node being deleted while the node is still running
and continue processing all incoming requests. The node is removed only after all tenant shards
have been safely migrated.
Although live migrations can be achieved with the drain functionality, it leads to incorrect shard
placement, such as not matching availability zones. This results in unnecessary work to optimize
the placement that was just recently performed.
If we delete a node before its tenant shards are fully moved, the new node won't have all the
needed data (e.g. heatmaps) ready. This means user requests to the new node will be much slower at
first. If there are many tenant shards, this slowdown affects a huge amount of users.
Graceful node deletion is more complicated and can introduce new issues. It takes longer because
live migration of each tenant shard can last several minutes. Using non-blocking accessors may
also cause deletion to wait if other processes are holding inner state lock. It also gets trickier
because we need to handle other requests, like drain and fill, at the same time.
## Impacted components (e.g. pageserver, safekeeper, console, etc)
- storage controller
- pageserver (indirectly)
## Proposed implementation
### Tombstones
To resolve the problem of deleted nodes re-adding themselves, a tombstone mechanism was introduced
as part of the node stored information. Each node has a separate `NodeLifecycle` field with two
possible states: `Active` and `Deleted`. When node deletion completes, the database row is not
deleted but instead has its `NodeLifecycle` column switched to `Deleted`. Nodes with `Deleted`
lifecycle are treated as if the row is absent for most handlers, with several exceptions: reattach
and register functionality must be aware of tombstones. Additionally, new debug handlers are
available for listing and deleting tombstones via the `/debug/v1/tombstone` path.
### Gracefulness
The problem of making node deletion graceful is complex and involves several challenges:
- **Cancellable**: The operation must be cancellable to allow administrators to abort the process
if needed, e.g. if run by mistake.
- **Non-blocking**: We don't want to block deployment operations like draining/filling on the node
deletion process. We need clear policies for handling concurrent operations: what happens when a
drain/fill request arrives while deletion is in progress, and what happens when a delete request
arrives while drain/fill is in progress.
- **Persistent**: If the storage controller restarts during this long-running operation, we must
preserve progress and automatically resume the deletion process after the storage controller
restarts.
- **Migrated correctly**: We cannot simply use the existing drain mechanism for nodes scheduled
for deletion, as this would move shards to irrelevant locations. The drain process expects the
node to return, so it only moves shards to backup locations, not to their preferred AZs. It also
leaves secondary locations unmoved. This could result in unnecessary load on the storage
controller and inefficient resource utilization.
- **Force option**: Administrators need the ability to force immediate, non-graceful deletion when
time constraints or emergency situations require it, bypassing the normal graceful migration
process.
See below for a detailed breakdown of the proposed changes and mechanisms.
#### Node lifecycle
New `NodeLifecycle` enum and a matching database field with these values:
- `Active`: The normal state. All operations are allowed.
- `ScheduledForDeletion`: The node is marked to be deleted soon. Deletion may be in progress or
will happen later, but the node will eventually be removed. All operations are allowed.
- `Deleted`: The node is fully deleted. No operations are allowed, and the node cannot be brought
back. The only action left is to remove its record from the database. Any attempt to register a
node in this state will fail.
This state persists across storage controller restarts.
**State transition**
```
+--------------------+
+---| Active |<---------------------+
| +--------------------+ |
| ^ |
| start_node_delete | cancel_node_delete |
v | |
+----------------------------------+ |
| ScheduledForDeletion | |
+----------------------------------+ |
| |
| node_register |
| |
| delete_node (at the finish) |
| |
v |
+---------+ tombstone_delete +----------+
| Deleted |-------------------------------->| no row |
+---------+ +----------+
```
#### NodeSchedulingPolicy::Deleting
A `Deleting` variant to the `NodeSchedulingPolicy` enum. This means the deletion function is
running for the node right now. Only one node can have the `Deleting` policy at a time.
The `NodeSchedulingPolicy::Deleting` state is persisted in the database. However, after a storage
controller restart, any node previously marked as `Deleting` will have its scheduling policy reset
to `Pause`. The policy will only transition back to `Deleting` when the deletion operation is
actively started again, as triggered by the node's `NodeLifecycle::ScheduledForDeletion` state.
`NodeSchedulingPolicy` transition details:
1. When `node_delete` begins, set the policy to `NodeSchedulingPolicy::Deleting`.
2. If `node_delete` is cancelled (for example, due to a concurrent drain operation), revert the
policy to its previous value. The policy is persisted in storcon DB.
3. After `node_delete` completes, the final value of the scheduling policy is irrelevant, since
`NodeLifecycle::Deleted` prevents any further access to this field.
The deletion process cannot be initiated for nodes currently undergoing deployment-related
operations (`Draining`, `Filling`, or `PauseForRestart` policies). Deletion will only be triggered
once the node transitions to either the `Active` or `Pause` state.
#### OperationTracker
A replacement for `Option<OperationHandler> ongoing_operation`, the `OperationTracker` is a
dedicated service state object responsible for managing all long-running node operations (drain,
fill, delete) with robust concurrency control.
Key responsibilities:
- Orchestrates the execution of operations
- Supports cancellation of currently running operations
- Enforces operation constraints, e.g. allowing only single drain/fill operation at a time
- Persists deletion state, enabling recovery of pending deletions across restarts
- Ensures thread safety across concurrent requests
#### Attached tenant shard processing
When deleting a node, handle each attached tenant shard as follows:
1. Pick the best node to become the new attached (the candidate).
2. If the candidate already has this shard as a secondary:
- Create a new secondary for the shard on another suitable node.
Otherwise:
- Create a secondary for the shard on the candidate node.
3. Wait until all secondaries are ready and pre-warmed.
4. Promote the candidate's secondary to attached.
5. Remove the secondary from the node being deleted.
This process safely moves all attached shards before deleting the node.
#### Secondary tenant shard processing
When deleting a node, handle each secondary tenant shard as follows:
1. Choose the best node to become the new secondary.
2. Create a secondary for the shard on that node.
3. Wait until the new secondary is ready.
4. Remove the secondary from the node being deleted.
This ensures all secondary shards are safely moved before deleting the node.
### Reliability, failure modes and corner cases
In case of a storage controller failure and following restart, the system behavior depends on the
`NodeLifecycle` state:
- If `NodeLifecycle` is `Active`: No action is taken for this node.
- If `NodeLifecycle` is `Deleted`: The node will not be re-added.
- If `NodeLifecycle` is `ScheduledForDeletion`: A deletion background task will be launched for
this node.
In case of a pageserver node failure during deletion, the behavior depends on the `force` flag:
- If `force` is set: The node deletion will proceed regardless of the node's availability.
- If `force` is not set: The deletion will be retried a limited number of times. If the node
remains unavailable, the deletion process will pause and automatically resume when the node
becomes healthy again.
### Operations concurrency
The following sections describe the behavior when different types of requests arrive at the storage
controller and how they interact with ongoing operations.
#### Delete request
Handler: `PUT /control/v1/node/:node_id/delete`
1. If node lifecycle is `NodeLifecycle::ScheduledForDeletion`:
- Return `200 OK`: there is already an ongoing deletion request for this node
2. Update & persist lifecycle to `NodeLifecycle::ScheduledForDeletion`
3. Persist current scheduling policy
4. If there is no active operation (drain/fill/delete):
- Run deletion process for this node
#### Cancel delete request
Handler: `DELETE /control/v1/node/:node_id/delete`
1. If node lifecycle is not `NodeLifecycle::ScheduledForDeletion`:
- Return `404 Not Found`: there is no current deletion request for this node
2. If the active operation is deleting this node, cancel it
3. Update & persist lifecycle to `NodeLifecycle::Active`
4. Restore the last scheduling policy from persistence
#### Drain/fill request
1. If there are already ongoing drain/fill processes:
- Return `409 Conflict`: queueing of drain/fill processes is not supported
2. If there is an ongoing delete process:
- Cancel it and wait until it is cancelled
3. Run the drain/fill process
4. After the drain/fill process is cancelled or finished:
- Try to find another candidate to delete and run the deletion process for that node
#### Drain/fill cancel request
1. If the active operation is not the related process:
- Return `400 Bad Request`: cancellation request is incorrect, operations are not the same
2. Cancel the active operation
3. Try to find another candidate to delete and run the deletion process for that node
## Definition of Done
- [x] Fix flaky node scenario and introduce related debug handlers
- [ ] Node deletion intent is persistent - a node will be eventually deleted after a deletion
request regardless of draining/filling requests and restarts
- [ ] Node deletion can be graceful - deletion completes only after moving all tenant shards to
recommended locations
- [ ] Deploying does not break due to long deletions - drain/fill operations override deletion
process and deletion resumes after drain/fill completes
- [ ] `force` flag is implemented and provides fast, failure-tolerant node removal (e.g., when a
pageserver node does not respond)
- [ ] Legacy delete handler code is removed from storage_controller, test_runner, and storcon_cli

View File

@@ -68,11 +68,15 @@ pub enum LfcPrewarmState {
/// We tried to fetch the corresponding LFC state from the endpoint storage,
/// but received `Not Found 404`. This should normally happen only during the
/// first endpoint start after creation with `autoprewarm: true`.
/// This may also happen if LFC is turned off or not initialized
///
/// During the orchestrated prewarm via API, when a caller explicitly
/// provides the LFC state key to prewarm from, it's the caller responsibility
/// to handle this status as an error state in this case.
Skipped,
/// LFC prewarm was cancelled. Some pages in LFC cache may be prewarmed if query
/// has started working before cancellation
Cancelled,
}
impl Display for LfcPrewarmState {
@@ -83,6 +87,7 @@ impl Display for LfcPrewarmState {
LfcPrewarmState::Completed => f.write_str("Completed"),
LfcPrewarmState::Skipped => f.write_str("Skipped"),
LfcPrewarmState::Failed { error } => write!(f, "Error({error})"),
LfcPrewarmState::Cancelled => f.write_str("Cancelled"),
}
}
}
@@ -97,6 +102,7 @@ pub enum LfcOffloadState {
Failed {
error: String,
},
Skipped,
}
#[derive(Serialize, Debug, Clone, PartialEq)]

View File

@@ -341,6 +341,34 @@ extern "C-unwind" fn log_internal(
}
}
/* BEGIN_HADRON */
extern "C" fn reset_safekeeper_statuses_for_metrics(wp: *mut WalProposer, num_safekeepers: u32) {
unsafe {
let callback_data = (*(*wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
if api.is_null() {
return;
}
(*api).reset_safekeeper_statuses_for_metrics(&mut (*wp), num_safekeepers);
}
}
extern "C" fn update_safekeeper_status_for_metrics(
wp: *mut WalProposer,
sk_index: u32,
status: u8,
) {
unsafe {
let callback_data = (*(*wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
if api.is_null() {
return;
}
(*api).update_safekeeper_status_for_metrics(&mut (*wp), sk_index, status);
}
}
/* END_HADRON */
#[derive(Debug, PartialEq)]
pub enum Level {
Debug5,
@@ -414,6 +442,10 @@ pub(crate) fn create_api() -> walproposer_api {
finish_sync_safekeepers: Some(finish_sync_safekeepers),
process_safekeeper_feedback: Some(process_safekeeper_feedback),
log_internal: Some(log_internal),
/* BEGIN_HADRON */
reset_safekeeper_statuses_for_metrics: Some(reset_safekeeper_statuses_for_metrics),
update_safekeeper_status_for_metrics: Some(update_safekeeper_status_for_metrics),
/* END_HADRON */
}
}
@@ -451,6 +483,8 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
replica_promote: false,
min_ps_feedback: empty_feedback,
wal_rate_limiter: empty_wal_rate_limiter,
num_safekeepers: 0,
safekeeper_status: [0; 32],
}
}

View File

@@ -159,6 +159,21 @@ pub trait ApiImpl {
fn after_election(&self, _wp: &mut WalProposer) {
todo!()
}
/* BEGIN_HADRON */
fn reset_safekeeper_statuses_for_metrics(&self, _wp: &mut WalProposer, _num_safekeepers: u32) {
// Do nothing for testing purposes.
}
fn update_safekeeper_status_for_metrics(
&self,
_wp: &mut WalProposer,
_sk_index: u32,
_status: u8,
) {
// Do nothing for testing purposes.
}
/* END_HADRON */
}
#[derive(Debug)]

View File

@@ -391,6 +391,12 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
neon_per_backend_counters totals = {0};
metric_t *metrics;
/* BEGIN_HADRON */
WalproposerShmemState *wp_shmem;
uint32 num_safekeepers;
uint32 num_active_safekeepers;
/* END_HADRON */
/* We put all the tuples into a tuplestore in one go. */
InitMaterializedSRF(fcinfo, 0);
@@ -437,11 +443,32 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
// Not ideal but piggyback our databricks counters into the neon perf counters view
// so that we don't need to introduce neon--1.x+1.sql to add a new view.
{
// Keeping this code in its own block to work around the C90 "don't mix declarations and code" rule when we define
// the `databricks_metrics` array in the next block. Yes, we are seriously dealing with C90 rules in 2025.
// Read safekeeper status from wal proposer shared memory first.
// Note that we are taking a mutex when reading from walproposer shared memory so that the total safekeeper count is
// consistent with the active wal acceptors count. Assuming that we don't query this view too often the mutex should
// not be a huge deal.
wp_shmem = GetWalpropShmemState();
SpinLockAcquire(&wp_shmem->mutex);
num_safekeepers = wp_shmem->num_safekeepers;
num_active_safekeepers = 0;
for (int i = 0; i < num_safekeepers; i++) {
if (wp_shmem->safekeeper_status[i] == 1) {
num_active_safekeepers++;
}
}
SpinLockRelease(&wp_shmem->mutex);
}
{
metric_t databricks_metrics[] = {
{"sql_index_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->index_corruption_count)},
{"sql_data_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->data_corruption_count)},
{"sql_internal_error_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->internal_error_count)},
{"ps_corruption_detected", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->ps_corruption_detected)},
{"num_active_safekeepers", false, 0.0, (double) num_active_safekeepers},
{"num_configured_safekeepers", false, 0.0, (double) num_safekeepers},
{NULL, false, 0, 0},
};
for (int i = 0; databricks_metrics[i].name != NULL; i++)

View File

@@ -154,7 +154,9 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
wp->safekeeper[wp->n_safekeepers].state = SS_OFFLINE;
wp->safekeeper[wp->n_safekeepers].active_state = SS_ACTIVE_SEND;
wp->safekeeper[wp->n_safekeepers].wp = wp;
/* BEGIN_HADRON */
wp->safekeeper[wp->n_safekeepers].index = wp->n_safekeepers;
/* END_HADRON */
{
Safekeeper *sk = &wp->safekeeper[wp->n_safekeepers];
int written = 0;
@@ -183,6 +185,10 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
if (wp->safekeepers_generation > INVALID_GENERATION && wp->config->proto_version < 3)
wp_log(FATAL, "enabling generations requires protocol version 3");
wp_log(LOG, "using safekeeper protocol version %d", wp->config->proto_version);
/* BEGIN_HADRON */
wp->api.reset_safekeeper_statuses_for_metrics(wp, wp->n_safekeepers);
/* END_HADRON */
/* Fill the greeting package */
wp->greetRequest.pam.tag = 'g';
@@ -355,6 +361,10 @@ ShutdownConnection(Safekeeper *sk)
sk->state = SS_OFFLINE;
sk->streamingAt = InvalidXLogRecPtr;
/* BEGIN_HADRON */
sk->wp->api.update_safekeeper_status_for_metrics(sk->wp, sk->index, 0);
/* END_HADRON */
MembershipConfigurationFree(&sk->greetResponse.mconf);
if (sk->voteResponse.termHistory.entries)
pfree(sk->voteResponse.termHistory.entries);
@@ -1530,6 +1540,10 @@ StartStreaming(Safekeeper *sk)
sk->active_state = SS_ACTIVE_SEND;
sk->streamingAt = sk->startStreamingAt;
/* BEGIN_HADRON */
sk->wp->api.update_safekeeper_status_for_metrics(sk->wp, sk->index, 1);
/* END_HADRON */
/*
* Donors can only be in SS_ACTIVE state, so we potentially update the
* donor when we switch one to SS_ACTIVE.

View File

@@ -432,6 +432,10 @@ typedef struct WalproposerShmemState
/* BEGIN_HADRON */
/* The WAL rate limiter */
WalRateLimiter wal_rate_limiter;
/* Number of safekeepers in the config */
uint32 num_safekeepers;
/* Per-safekeeper status flags: 0=inactive, 1=active */
uint8 safekeeper_status[MAX_SAFEKEEPERS];
/* END_HADRON */
} WalproposerShmemState;
@@ -483,6 +487,11 @@ typedef struct Safekeeper
char const *host;
char const *port;
/* BEGIN_HADRON */
/* index of this safekeeper in the WalProposer array */
uint32 index;
/* END_HADRON */
/*
* connection string for connecting/reconnecting.
*
@@ -731,6 +740,23 @@ typedef struct walproposer_api
* handled by elog().
*/
void (*log_internal) (WalProposer *wp, int level, const char *line);
/*
* BEGIN_HADRON
* APIs manipulating shared memory state used for Safekeeper quorum health metrics.
*/
/*
* Reset the safekeeper statuses in shared memory for metric purposes.
*/
void (*reset_safekeeper_statuses_for_metrics) (WalProposer *wp, uint32 num_safekeepers);
/*
* Update the safekeeper status in shared memory for metric purposes.
*/
void (*update_safekeeper_status_for_metrics) (WalProposer *wp, uint32 sk_index, uint8 status);
/* END_HADRON */
} walproposer_api;
/*

View File

@@ -2261,6 +2261,27 @@ GetNeonCurrentClusterSize(void)
}
uint64 GetNeonCurrentClusterSize(void);
/* BEGIN_HADRON */
static void
walprop_pg_reset_safekeeper_statuses_for_metrics(WalProposer *wp, uint32 num_safekeepers)
{
WalproposerShmemState* shmem = wp->api.get_shmem_state(wp);
SpinLockAcquire(&shmem->mutex);
shmem->num_safekeepers = num_safekeepers;
memset(shmem->safekeeper_status, 0, sizeof(shmem->safekeeper_status));
SpinLockRelease(&shmem->mutex);
}
static void
walprop_pg_update_safekeeper_status_for_metrics(WalProposer *wp, uint32 sk_index, uint8 status)
{
WalproposerShmemState* shmem = wp->api.get_shmem_state(wp);
Assert(sk_index < MAX_SAFEKEEPERS);
SpinLockAcquire(&shmem->mutex);
shmem->safekeeper_status[sk_index] = status;
SpinLockRelease(&shmem->mutex);
}
/* END_HADRON */
static const walproposer_api walprop_pg = {
.get_shmem_state = walprop_pg_get_shmem_state,
@@ -2294,4 +2315,6 @@ static const walproposer_api walprop_pg = {
.finish_sync_safekeepers = walprop_pg_finish_sync_safekeepers,
.process_safekeeper_feedback = walprop_pg_process_safekeeper_feedback,
.log_internal = walprop_pg_log_internal,
.reset_safekeeper_statuses_for_metrics = walprop_pg_reset_safekeeper_statuses_for_metrics,
.update_safekeeper_status_for_metrics = walprop_pg_update_safekeeper_status_for_metrics,
};

View File

@@ -458,7 +458,7 @@ pub(crate) enum LocalProxyConnError {
impl ReportableError for HttpConnError {
fn get_error_kind(&self) -> ErrorKind {
match self {
HttpConnError::ConnectError(_) => ErrorKind::Compute,
HttpConnError::ConnectError(e) => e.get_error_kind(),
HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute,
HttpConnError::PostgresConnectionError(p) => match p.as_db_error() {
// user provided a wrong database name

View File

@@ -612,19 +612,25 @@ pub async fn handle_request(
}
}
let max_term = statuses
.iter()
.map(|(status, _)| status.acceptor_state.term)
.max()
.unwrap();
// Find the most advanced safekeeper
let (status, i) = statuses
.into_iter()
.max_by_key(|(status, _)| {
(
status.acceptor_state.epoch,
status.flush_lsn,
/* BEGIN_HADRON */
// We need to pull from the SK with the highest term.
// This is because another compute may come online and vote the same highest term again on the other two SKs.
// Then, there will be 2 computes running on the same term.
status.acceptor_state.term,
/* END_HADRON */
status.flush_lsn,
status.commit_lsn,
)
})
@@ -634,6 +640,22 @@ pub async fn handle_request(
assert!(status.tenant_id == request.tenant_id);
assert!(status.timeline_id == request.timeline_id);
// TODO(diko): This is hadron only check to make sure that we pull the timeline
// from the safekeeper with the highest term during timeline restore.
// We could avoid returning the error by calling bump_term after pull_timeline.
// However, this is not a big deal because we retry the pull_timeline requests.
// The check should be removed together with removing custom hadron logic for
// safekeeper restore.
if wait_for_peer_timeline_status && status.acceptor_state.term != max_term {
return Err(ApiError::PreconditionFailed(
format!(
"choosen safekeeper {} has term {}, but the most advanced term is {}",
safekeeper_host, status.acceptor_state.term, max_term
)
.into(),
));
}
match pull_timeline(
status,
safekeeper_host,

View File

@@ -195,12 +195,14 @@ impl StateSK {
to: Configuration,
) -> Result<TimelineMembershipSwitchResponse> {
let result = self.state_mut().membership_switch(to).await?;
let flush_lsn = self.flush_lsn();
let last_log_term = self.state().acceptor_state.get_last_log_term(flush_lsn);
Ok(TimelineMembershipSwitchResponse {
previous_conf: result.previous_conf,
current_conf: result.current_conf,
last_log_term: self.state().acceptor_state.term,
flush_lsn: self.flush_lsn(),
last_log_term,
flush_lsn,
})
}

View File

@@ -471,11 +471,17 @@ impl Persistence {
&self,
input_node_id: NodeId,
input_https_port: Option<u16>,
input_grpc_addr: Option<String>,
input_grpc_port: Option<u16>,
) -> DatabaseResult<()> {
use crate::schema::nodes::dsl::*;
self.update_node(
input_node_id,
listen_https_port.eq(input_https_port.map(|x| x as i32)),
(
listen_https_port.eq(input_https_port.map(|x| x as i32)),
listen_grpc_addr.eq(input_grpc_addr),
listen_grpc_port.eq(input_grpc_port.map(|x| x as i32)),
),
)
.await
}

View File

@@ -7813,7 +7813,7 @@ impl Service {
register_req.listen_https_port,
register_req.listen_pg_addr,
register_req.listen_pg_port,
register_req.listen_grpc_addr,
register_req.listen_grpc_addr.clone(),
register_req.listen_grpc_port,
register_req.availability_zone_id.clone(),
self.config.use_https_pageserver_api,
@@ -7848,6 +7848,8 @@ impl Service {
.update_node_on_registration(
register_req.node_id,
register_req.listen_https_port,
register_req.listen_grpc_addr,
register_req.listen_grpc_port,
)
.await?
}

View File

@@ -24,12 +24,12 @@ use pageserver_api::controller_api::{
};
use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
use safekeeper_api::PgVersionId;
use safekeeper_api::Term;
use safekeeper_api::membership::{self, MemberSet, SafekeeperGeneration};
use safekeeper_api::models::{
PullTimelineRequest, TimelineLocateResponse, TimelineMembershipSwitchRequest,
TimelineMembershipSwitchResponse,
};
use safekeeper_api::{INITIAL_TERM, Term};
use safekeeper_client::mgmt_api;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
@@ -1298,13 +1298,7 @@ impl Service {
)
.await?;
let mut sync_position = (INITIAL_TERM, Lsn::INVALID);
for res in results.into_iter().flatten() {
let sk_position = (res.last_log_term, res.flush_lsn);
if sync_position < sk_position {
sync_position = sk_position;
}
}
let sync_position = Self::get_sync_position(&results)?;
tracing::info!(
%generation,
@@ -1598,4 +1592,36 @@ impl Service {
Ok(())
}
/// Get membership switch responses from all safekeepers and return the sync position.
///
/// Sync position is a position equal or greater than the commit position.
/// It is guaranteed that all WAL entries with (last_log_term, flush_lsn)
/// greater than the sync position are not committed (= not on a quorum).
///
/// Returns error if there is no quorum of successful responses.
fn get_sync_position(
responses: &[mgmt_api::Result<TimelineMembershipSwitchResponse>],
) -> Result<(Term, Lsn), ApiError> {
let quorum_size = responses.len() / 2 + 1;
let mut wal_positions = responses
.iter()
.flatten()
.map(|res| (res.last_log_term, res.flush_lsn))
.collect::<Vec<_>>();
// Should be already checked if the responses are from tenant_timeline_set_membership_quorum.
if wal_positions.len() < quorum_size {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"not enough successful responses to get sync position: {}/{}",
wal_positions.len(),
quorum_size,
)));
}
wal_positions.sort();
Ok(wal_positions[quorum_size - 1])
}
}

View File

@@ -78,20 +78,26 @@ class EndpointHttpClient(requests.Session):
json: dict[str, str] = res.json()
return json
def prewarm_lfc(self, from_endpoint_id: str | None = None):
def prewarm_lfc(self, from_endpoint_id: str | None = None) -> dict[str, str]:
"""
Prewarm LFC cache from given endpoint and wait till it finishes or errors
"""
params = {"from_endpoint": from_endpoint_id} if from_endpoint_id else dict()
self.post(self.prewarm_url, params=params).raise_for_status()
self.prewarm_lfc_wait()
return self.prewarm_lfc_wait()
def prewarm_lfc_wait(self):
def cancel_prewarm_lfc(self):
"""
Cancel LFC prewarm if any is ongoing
"""
self.delete(self.prewarm_url).raise_for_status()
def prewarm_lfc_wait(self) -> dict[str, str]:
"""
Wait till LFC prewarm returns with error or success.
If prewarm was not requested before calling this function, it will error
"""
statuses = "failed", "completed", "skipped"
statuses = "failed", "completed", "skipped", "cancelled"
def prewarmed():
json = self.prewarm_lfc_status()
@@ -101,6 +107,7 @@ class EndpointHttpClient(requests.Session):
wait_until(prewarmed, timeout=60)
res = self.prewarm_lfc_status()
assert res["status"] != "failed", res
return res
def offload_lfc_status(self) -> dict[str, str]:
res = self.get(self.offload_url)
@@ -108,29 +115,31 @@ class EndpointHttpClient(requests.Session):
json: dict[str, str] = res.json()
return json
def offload_lfc(self):
def offload_lfc(self) -> dict[str, str]:
"""
Offload LFC cache to endpoint storage and wait till offload finishes or errors
"""
self.post(self.offload_url).raise_for_status()
self.offload_lfc_wait()
return self.offload_lfc_wait()
def offload_lfc_wait(self):
def offload_lfc_wait(self) -> dict[str, str]:
"""
Wait till LFC offload returns with error or success.
If offload was not requested before calling this function, it will error
"""
statuses = "failed", "completed", "skipped"
def offloaded():
json = self.offload_lfc_status()
status, err = json["status"], json.get("error")
assert status in ["failed", "completed"], f"{status}, {err=}"
assert status in statuses, f"{status}, {err=}"
wait_until(offloaded, timeout=60)
res = self.offload_lfc_status()
assert res["status"] != "failed", res
return res
def promote(self, promote_spec: dict[str, Any], disconnect: bool = False):
def promote(self, promote_spec: dict[str, Any], disconnect: bool = False) -> dict[str, str]:
url = f"http://localhost:{self.external_port}/promote"
if disconnect:
try: # send first request to start promote and disconnect

View File

@@ -79,6 +79,7 @@ class NeonAPI:
elif resp.status_code == 423 and resp.json()["message"] in {
"endpoint is in some transitive state, could not suspend",
"project already has running conflicting operations, scheduling of new ones is prohibited",
"snapshot is in transition",
}:
retry = True
self.retries4xx += 1
@@ -355,6 +356,63 @@ class NeonAPI:
return cast("dict[str, Any]", resp.json())
def create_snapshot(
self,
project_id: str,
branch_id: str,
lsn: str | None = None,
timestamp: str | None = None,
name: str | None = None,
expires_at: str | None = None,
) -> dict[str, Any]:
params: dict[str, Any] = {
"lsn": lsn,
"timestamp": timestamp,
"name": name,
"expires_at": expires_at,
}
params = {key: value for key, value in params.items() if value is not None}
resp = self.__request(
"POST",
f"/projects/{project_id}/branches/{branch_id}/snapshot",
params=params,
json={},
headers={
"Accept": "application/json",
},
)
return cast("dict[str, Any]", resp.json())
def delete_snapshot(self, project_id: str, snapshot_id: str) -> dict[str, Any]:
resp = self.__request("DELETE", f"/projects/{project_id}/snapshots/{snapshot_id}")
return cast("dict[str, Any]", resp.json())
def restore_snapshot(
self,
project_id: str,
snapshot_id: str,
target_branch_id: str,
name: str | None = None,
finalize_restore: bool = True,
) -> dict[str, Any]:
data: dict[str, Any] = {
"target_branch_id": target_branch_id,
"finalize_restore": finalize_restore,
}
if name is not None:
data["name"] = name
log.info("Restore snapshot data: %s", data)
resp = self.__request(
"POST",
f"/projects/{project_id}/snapshots/{snapshot_id}/restore",
json=data,
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
)
return cast("dict[str, Any]", resp.json())
def delete_endpoint(self, project_id: str, endpoint_id: str) -> dict[str, Any]:
resp = self.__request("DELETE", f"/projects/{project_id}/endpoints/{endpoint_id}")
return cast("dict[str,Any]", resp.json())
@@ -396,6 +454,14 @@ class NeonAPI:
return cast("dict[str, Any]", resp.json())
def get_branch_endpoints(self, project_id: str, branch_id: str) -> dict[str, Any]:
resp = self.__request(
"GET",
f"/projects/{project_id}/branches/{branch_id}/endpoints",
headers={"Accept": "application/json", "Content-Type": "application/json"},
)
return cast("dict[str, Any]", resp.json())
def get_endpoints(self, project_id: str) -> dict[str, Any]:
resp = self.__request(
"GET",

View File

@@ -262,7 +262,6 @@ class PgProtocol:
# pooler does not support statement_timeout
# Check if the hostname contains the string 'pooler'
hostname = result.get("host", "")
log.info(f"Hostname: {hostname}")
options = result.get("options", "")
if "statement_timeout" not in options and "pooler" not in hostname:
options = f"-cstatement_timeout=120s {options}"
@@ -2314,6 +2313,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
timeline_id: TimelineId,
new_sk_set: list[int],
):
log.info(f"migrate_safekeepers({tenant_id}, {timeline_id}, {new_sk_set})")
response = self.request(
"POST",
f"{self.api}/v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate",

View File

@@ -11,6 +11,7 @@ import time
from datetime import UTC, datetime, timedelta
from typing import TYPE_CHECKING, Any
import psycopg2
import pytest
from fixtures.log_helper import log
@@ -22,6 +23,29 @@ if TYPE_CHECKING:
from fixtures.pg_version import PgVersion
class NeonSnapshot:
"""
A snapshot of the Neon Branch
Gets the output of the API call af a snapshot creation
"""
def __init__(self, project: NeonProject, snapshot: dict[str, Any]):
self.project: NeonProject = project
snapshot = snapshot["snapshot"]
self.id: str = snapshot["id"]
self.name: str = snapshot["name"]
self.created_at: datetime = datetime.fromisoformat(snapshot["created_at"])
self.source_branch: NeonBranch = project.branches[snapshot["source_branch_id"]]
project.snapshots[self.id] = self
self.restored: bool = False
def __str__(self) -> str:
return f"id: {self.id}, name: {self.name}, created_at: {self.created_at}"
def delete(self) -> None:
self.project.delete_snapshot(self.id)
class NeonEndpoint:
"""
Neon Endpoint
@@ -70,6 +94,12 @@ class NeonBranch:
def __init__(self, project, branch: dict[str, Any], is_reset=False):
self.id: str = branch["branch"]["id"]
self.desc = branch
self.name: str | None = None
if "name" in branch["branch"]:
self.name = branch["branch"]["name"]
self.restored_from: str | None = None
if "restored_from" in branch["branch"]:
self.restored_from = branch["branch"]["restored_from"]
self.project: NeonProject = project
self.neon_api: NeonAPI = project.neon_api
self.project_id: str = branch["branch"]["project_id"]
@@ -113,10 +143,9 @@ class NeonBranch:
def __str__(self):
"""
Prints the branch's name with all the predecessors
(r) means the branch is a reset one
Prints the branch's information with all the predecessors
"""
return f"{self.id}{'(r)' if self.id in self.project.reset_branches else ''}, parent: {self.parent}"
return f"{self.id}{f'({self.name})' if self.name and self.name != self.id else ''}{f'(restored_from: {self.restored_from})' if self.restored_from else ''}, parent: {self.parent}"
def random_time(self) -> datetime:
min_time = max(
@@ -152,6 +181,9 @@ class NeonBranch:
self.project.terminate_benchmark(self.id)
def reset_to_parent(self) -> None:
"""
Resets the branch to the parent branch
"""
for ep in self.project.endpoints.values():
if ep.type == "read_only":
ep.terminate_benchmark()
@@ -240,6 +272,7 @@ class NeonProject:
# Leaf branches are the branches, which do not have children
self.leaf_branches: dict[str, NeonBranch] = {}
self.branches: dict[str, NeonBranch] = {}
self.branch_num: int = 0
self.reset_branches: set[str] = set()
self.main_branch: NeonBranch = NeonBranch(self, proj)
self.main_branch.connection_parameters = self.connection_parameters
@@ -253,6 +286,8 @@ class NeonProject:
self.limits: dict[str, Any] = self.get_limits()["limits"]
self.read_only_endpoints_total: int = 0
self.min_time: datetime = datetime.now(UTC)
self.snapshots: dict[str, NeonSnapshot] = {}
self.snapshot_num: int = 0
def get_limits(self) -> dict[str, Any]:
return self.neon_api.get_project_limits(self.id)
@@ -280,7 +315,10 @@ class NeonProject:
return False
def create_branch(
self, parent_id: str | None = None, parent_timestamp: datetime | None = None
self,
parent_id: str | None = None,
parent_timestamp: datetime | None = None,
is_reset: bool = False,
) -> NeonBranch | None:
self.wait()
if not self.check_limit_branches():
@@ -293,14 +331,14 @@ class NeonProject:
branch_def = self.neon_api.create_branch(
self.id, parent_id=parent_id, parent_timestamp=parent_timestamp_str
)
new_branch = NeonBranch(self, branch_def)
new_branch = NeonBranch(self, branch_def, is_reset)
self.wait()
return new_branch
def delete_branch(self, branch_id: str) -> None:
parent = self.branches[branch_id].parent
if not parent or branch_id == self.main_branch.id:
raise RuntimeError("Cannot delete the main branch")
raise RuntimeError("Cannot delete the main branch or a branch restored from a snapshot")
if branch_id not in self.leaf_branches and branch_id not in self.reset_branches:
raise RuntimeError(f"The branch {branch_id}, probably, has ancestors")
if branch_id not in self.branches:
@@ -313,7 +351,7 @@ class NeonProject:
if branch_id not in self.reset_branches:
self.terminate_benchmark(branch_id)
self.neon_api.delete_branch(self.id, branch_id)
if len(parent.children) == 1 and parent.id != self.main_branch.id:
if len(parent.children) == 1 and parent.parent is not None:
self.leaf_branches[parent.id] = parent
parent.children.pop(branch_id)
if branch_id in self.leaf_branches:
@@ -333,6 +371,22 @@ class NeonProject:
log.info("No leaf branches found")
return target
def get_random_parent_branch(self) -> NeonBranch:
return self.branches[random.choice(list(set(self.branches.keys()) - self.reset_branches))]
def gen_branch_name(self) -> str:
self.branch_num += 1
return f"branch{self.branch_num}"
def get_random_snapshot(self) -> NeonSnapshot | None:
snapshot: NeonSnapshot | None = None
avail_snapshots = [sn for sn in self.snapshots.values() if not sn.restored]
if avail_snapshots:
snapshot = random.choice(avail_snapshots)
else:
log.info("No snapshots found")
return snapshot
def delete_endpoint(self, endpoint_id: str) -> None:
self.terminate_benchmark(endpoint_id)
self.neon_api.delete_endpoint(self.id, endpoint_id)
@@ -409,6 +463,116 @@ class NeonProject:
self.restore_num += 1
return f"restore{self.restore_num}"
def gen_snapshot_name(self) -> str:
self.snapshot_num += 1
return f"snapshot{self.snapshot_num}"
def create_snapshot(
self,
lsn: str | None = None,
timestamp: datetime | None = None,
) -> NeonSnapshot:
"""
Create a new Neon snapshot for the current project
Two optional arguments: lsn and timestamp are mutually exclusive
they instruct to create a snapshot with the specific lns or timestamp
"""
snapshot_name = self.gen_snapshot_name()
with psycopg2.connect(self.connection_uri) as conn:
with conn.cursor() as cur:
# We will check the value we set now after the snapshot restored to verify consistency
cur.execute(
f"INSERT INTO sanity_check (name, value) VALUES "
f"('snapsot_name', '{snapshot_name}') ON CONFLICT (name) DO UPDATE SET value = EXCLUDED.value"
)
conn.commit()
snapshot = NeonSnapshot(
self,
self.neon_api.create_snapshot(
self.id,
self.main_branch.id,
lsn,
timestamp.isoformat().replace("+00:00", "Z") if timestamp else None,
snapshot_name,
),
)
self.wait()
# Now we taint the value after the snapshot was taken
cur.execute("UPDATE sanity_check SET value = 'tainted' || value")
conn.commit()
return snapshot
def delete_snapshot(self, snapshot_id: str) -> None:
"""
Deletes the snapshot with the given id
"""
self.wait()
self.neon_api.delete_snapshot(self.id, snapshot_id)
self.snapshots.pop(snapshot_id)
self.wait()
def restore_snapshot(self, snapshot_id: str) -> NeonBranch | None:
"""
Creates a new Neon branch for the current project, then restores the snapshot
with the given id
"""
target_branch = self.get_random_parent_branch().create_child_branch()
if not target_branch:
return None
self.snapshots[snapshot_id].restored = True
new_branch_def: dict[str, Any] = self.neon_api.restore_snapshot(
self.id,
snapshot_id,
target_branch.id,
self.gen_branch_name(),
)
self.wait()
new_branch_def = self.neon_api.get_branch_details(self.id, new_branch_def["branch"]["id"])
# The restored branch will lose the parent afterward, but it has it during the restoration.
# So, we delete parent_id
new_branch_def["branch"].pop("parent_id")
new_branch = NeonBranch(self, new_branch_def)
log.info("Restored snapshot to the branch: %s", new_branch)
target_branch_def = self.neon_api.get_branch_details(self.id, target_branch.id)
if "name" in target_branch_def["branch"]:
target_branch.name = target_branch_def["branch"]["name"]
if new_branch.connection_parameters is None:
if not new_branch.endpoints:
for ep in self.neon_api.get_branch_endpoints(self.id, new_branch.id)["endpoints"]:
if ep["id"] not in self.endpoints:
NeonEndpoint(self, ep)
new_branch.connection_parameters = self.connection_parameters.copy()
for ep in new_branch.endpoints.values():
if ep.type == "read_write":
new_branch.connection_parameters["host"] = ep.host
break
new_branch.connect_env = {
"PGHOST": new_branch.connection_parameters["host"],
"PGUSER": new_branch.connection_parameters["role"],
"PGDATABASE": new_branch.connection_parameters["database"],
"PGPASSWORD": new_branch.connection_parameters["password"],
"PGSSLMODE": "require",
}
with psycopg2.connect(
host=new_branch.connection_parameters["host"],
port=5432,
user=new_branch.connection_parameters["role"],
password=new_branch.connection_parameters["password"],
database=new_branch.connection_parameters["database"],
) as conn:
with conn.cursor() as cur:
cur.execute("SELECT value FROM sanity_check WHERE name = 'snapsot_name'")
snapshot_name = None
if row := cur.fetchone():
snapshot_name = row[0]
# We verify here that the value we select from the table matches with the snapshot name
# To ensure consistency
assert snapshot_name == self.snapshots[snapshot_id].name
self.wait()
target_branch.start_benchmark()
new_branch.start_benchmark()
return new_branch
@pytest.fixture()
def setup_class(
@@ -438,9 +602,7 @@ def do_action(project: NeonProject, action: str) -> bool:
if action == "new_branch" or action == "new_branch_random_time":
use_random_time: bool = action == "new_branch_random_time"
log.info("Trying to create a new branch %s", "random time" if use_random_time else "")
parent = project.branches[
random.choice(list(set(project.branches.keys()) - project.reset_branches))
]
parent = project.get_random_parent_branch()
child = parent.create_child_branch(parent.random_time() if use_random_time else None)
if child is None:
return False
@@ -479,6 +641,23 @@ def do_action(project: NeonProject, action: str) -> bool:
return False
log.info("Reset to parent %s", target)
target.reset_to_parent()
elif action == "create_snapshot":
snapshot = project.create_snapshot()
if snapshot is None:
return False
log.info("Created snapshot %s", snapshot)
elif action == "restore_snapshot":
if (snapshot_to_restore := project.get_random_snapshot()) is None:
return False
log.info("Restoring snapshot %s", snapshot_to_restore)
if project.restore_snapshot(snapshot_to_restore.id) is None:
return False
elif action == "delete_snapshot":
snapshot_to_delete = project.get_random_snapshot()
if snapshot_to_delete is None:
return False
snapshot_to_delete.delete()
log.info("Deleted snapshot %s", snapshot_to_delete)
else:
raise ValueError(f"The action {action} is unknown")
return True
@@ -512,12 +691,28 @@ def test_api_random(
("delete_branch", 1.2),
("restore_random_time", 0.9),
("reset_to_parent", 0.3),
("create_snapshot", 0.2),
("restore_snapshot", 0.1),
("delete_snapshot", 0.1),
)
if num_ops_env := os.getenv("NUM_OPERATIONS"):
num_operations = int(num_ops_env)
else:
num_operations = 250
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", "-s100"], env=project.main_branch.connect_env)
# Create a table for sanity check
# We are going to leve some control values there to check, e.g., after restoring a snapshot
pg_bin.run(
[
"psql",
"-c",
"CREATE TABLE IF NOT EXISTS sanity_check (name VARCHAR NOT NULL PRIMARY KEY, value VARCHAR)",
],
env=project.main_branch.connect_env,
)
# To not go to the past where pgbench tables do not exist
time.sleep(1)
project.min_time = datetime.now(UTC)
# To not go to the past where pgbench tables do not exist
time.sleep(1)
project.min_time = datetime.now(UTC)

View File

@@ -863,7 +863,6 @@ def test_pageserver_compaction_circuit_breaker(neon_env_builder: NeonEnvBuilder)
assert not env.pageserver.log_contains(".*Circuit breaker failure ended.*")
@pytest.mark.skip(reason="Lakebase mode")
def test_ps_corruption_detection_feedback(neon_env_builder: NeonEnvBuilder):
"""
Test that when the pageserver detects corruption during image layer creation,
@@ -890,7 +889,9 @@ def test_ps_corruption_detection_feedback(neon_env_builder: NeonEnvBuilder):
timeline_id = env.initial_timeline
pageserver_http = env.pageserver.http_client()
workload = Workload(env, tenant_id, timeline_id)
workload = Workload(
env, tenant_id, timeline_id, endpoint_opts={"config_lines": ["neon.lakebase_mode=true"]}
)
workload.init()
# Enable the failpoint that will cause image layer creation to fail due to a (simulated) detected

View File

@@ -1,6 +1,6 @@
import random
import threading
from enum import StrEnum
from threading import Thread
from time import sleep
from typing import Any
@@ -47,19 +47,23 @@ def offload_lfc(method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor)
# With autoprewarm, we need to be sure LFC was offloaded after all writes
# finish, so we sleep. Otherwise we'll have less prewarmed pages than we want
sleep(AUTOOFFLOAD_INTERVAL_SECS)
client.offload_lfc_wait()
return
offload_res = client.offload_lfc_wait()
log.info(offload_res)
return offload_res
if method == PrewarmMethod.COMPUTE_CTL:
status = client.prewarm_lfc_status()
assert status["status"] == "not_prewarmed"
assert "error" not in status
client.offload_lfc()
offload_res = client.offload_lfc()
log.info(offload_res)
assert client.prewarm_lfc_status()["status"] == "not_prewarmed"
parsed = prom_parse(client)
desired = {OFFLOAD_LABEL: 1, PREWARM_LABEL: 0, OFFLOAD_ERR_LABEL: 0, PREWARM_ERR_LABEL: 0}
assert parsed == desired, f"{parsed=} != {desired=}"
return
return offload_res
raise AssertionError(f"{method} not in PrewarmMethod")
@@ -68,21 +72,30 @@ def prewarm_endpoint(
method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor, lfc_state: str | None
):
if method == PrewarmMethod.AUTOPREWARM:
client.prewarm_lfc_wait()
prewarm_res = client.prewarm_lfc_wait()
log.info(prewarm_res)
elif method == PrewarmMethod.COMPUTE_CTL:
client.prewarm_lfc()
prewarm_res = client.prewarm_lfc()
log.info(prewarm_res)
return prewarm_res
elif method == PrewarmMethod.POSTGRES:
cur.execute("select neon.prewarm_local_cache(%s)", (lfc_state,))
def check_prewarmed(
def check_prewarmed_contains(
method: PrewarmMethod, client: EndpointHttpClient, desired_status: dict[str, str | int]
):
if method == PrewarmMethod.AUTOPREWARM:
assert client.prewarm_lfc_status() == desired_status
prewarm_status = client.prewarm_lfc_status()
for k in desired_status:
assert desired_status[k] == prewarm_status[k]
assert prom_parse(client)[PREWARM_LABEL] == 1
elif method == PrewarmMethod.COMPUTE_CTL:
assert client.prewarm_lfc_status() == desired_status
prewarm_status = client.prewarm_lfc_status()
for k in desired_status:
assert desired_status[k] == prewarm_status[k]
desired = {OFFLOAD_LABEL: 0, PREWARM_LABEL: 1, PREWARM_ERR_LABEL: 0, OFFLOAD_ERR_LABEL: 0}
assert prom_parse(client) == desired
@@ -149,9 +162,6 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod):
log.info(f"Used LFC size: {lfc_used_pages}")
pg_cur.execute("select * from neon.get_prewarm_info()")
total, prewarmed, skipped, _ = pg_cur.fetchall()[0]
log.info(f"Prewarm info: {total=} {prewarmed=} {skipped=}")
progress = (prewarmed + skipped) * 100 // total
log.info(f"Prewarm progress: {progress}%")
assert lfc_used_pages > 10000
assert total > 0
assert prewarmed > 0
@@ -161,7 +171,54 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod):
assert lfc_cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
desired = {"status": "completed", "total": total, "prewarmed": prewarmed, "skipped": skipped}
check_prewarmed(method, client, desired)
check_prewarmed_contains(method, client, desired)
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_lfc_prewarm_cancel(neon_simple_env: NeonEnv):
"""
Test we can cancel LFC prewarm and prewarm successfully after
"""
env = neon_simple_env
n_records = 1000000
cfg = [
"autovacuum = off",
"shared_buffers=1MB",
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
"neon.file_cache_prewarm_limit=1000",
]
endpoint = env.endpoints.create_start(branch_name="main", config_lines=cfg)
pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor()
pg_cur.execute("create schema neon; create extension neon with schema neon")
pg_cur.execute("create database lfc")
lfc_conn = endpoint.connect(dbname="lfc")
lfc_cur = lfc_conn.cursor()
log.info(f"Inserting {n_records} rows")
lfc_cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
lfc_cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
log.info(f"Inserted {n_records} rows")
client = endpoint.http_client()
method = PrewarmMethod.COMPUTE_CTL
offload_lfc(method, client, pg_cur)
endpoint.stop()
endpoint.start()
thread = Thread(target=lambda: prewarm_endpoint(method, client, pg_cur, None))
thread.start()
# wait 2 seconds to ensure we cancel prewarm SQL query
sleep(2)
client.cancel_prewarm_lfc()
thread.join()
assert client.prewarm_lfc_status()["status"] == "cancelled"
prewarm_endpoint(method, client, pg_cur, None)
assert client.prewarm_lfc_status()["status"] == "completed"
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
@@ -178,9 +235,8 @@ def test_lfc_prewarm_empty(neon_simple_env: NeonEnv):
cur = conn.cursor()
cur.execute("create schema neon; create extension neon with schema neon")
method = PrewarmMethod.COMPUTE_CTL
offload_lfc(method, client, cur)
prewarm_endpoint(method, client, cur, None)
assert client.prewarm_lfc_status()["status"] == "skipped"
assert offload_lfc(method, client, cur)["status"] == "skipped"
assert prewarm_endpoint(method, client, cur, None)["status"] == "skipped"
# autoprewarm isn't needed as we prewarm manually
@@ -251,11 +307,11 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, method: PrewarmMet
workload_threads = []
for _ in range(n_threads):
t = threading.Thread(target=workload)
t = Thread(target=workload)
workload_threads.append(t)
t.start()
prewarm_thread = threading.Thread(target=prewarm)
prewarm_thread = Thread(target=prewarm)
prewarm_thread.start()
def prewarmed():

View File

@@ -286,3 +286,177 @@ def test_sk_generation_aware_tombstones(neon_env_builder: NeonEnvBuilder):
assert re.match(r".*Timeline .* deleted.*", exc.value.response.text)
# The timeline should remain deleted.
expect_deleted(second_sk)
def test_safekeeper_migration_stale_timeline(neon_env_builder: NeonEnvBuilder):
"""
Test that safekeeper migration handles stale timeline correctly by migrating to
a safekeeper with a stale timeline.
1. Check that we are waiting for the stale timeline to catch up with the commit lsn.
The migration might fail if there is no compute to advance the WAL.
2. Check that we rely on last_log_term (and not the current term) when waiting for the
sync_position on step 7.
3. Check that migration succeeds if the compute is running.
"""
neon_env_builder.num_safekeepers = 2
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
"timeline_safekeeper_count": 1,
}
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
env.storage_controller.allowed_errors.append(".*not enough successful .* to reach quorum.*")
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
active_sk = env.get_safekeeper(mconf["sk_set"][0])
other_sk = [sk for sk in env.safekeepers if sk.id != active_sk.id][0]
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
ep.start(safekeeper_generation=1, safekeepers=[active_sk.id])
ep.safe_psql("CREATE TABLE t(a int)")
ep.safe_psql("INSERT INTO t VALUES (0)")
# Pull the timeline to other_sk, so other_sk now has a "stale" timeline on it.
other_sk.pull_timeline([active_sk], env.initial_tenant, env.initial_timeline)
# Advance the WAL on active_sk.
ep.safe_psql("INSERT INTO t VALUES (1)")
# The test is more tricky if we have the same last_log_term but different term/flush_lsn.
# Stop the active_sk during the endpoint shutdown because otherwise compute_ctl runs
# sync_safekeepers and advances last_log_term on active_sk.
active_sk.stop()
ep.stop(mode="immediate")
active_sk.start()
active_sk_status = active_sk.http_client().timeline_status(
env.initial_tenant, env.initial_timeline
)
other_sk_status = other_sk.http_client().timeline_status(
env.initial_tenant, env.initial_timeline
)
# other_sk should have the same last_log_term, but a stale flush_lsn.
assert active_sk_status.last_log_term == other_sk_status.last_log_term
assert active_sk_status.flush_lsn > other_sk_status.flush_lsn
commit_lsn = active_sk_status.flush_lsn
# Bump the term on other_sk to make it higher than active_sk.
# This is to make sure we don't use current term instead of last_log_term in the algorithm.
other_sk.http_client().term_bump(
env.initial_tenant, env.initial_timeline, active_sk_status.term + 100
)
# TODO(diko): now it fails because the timeline on other_sk is stale and there is no compute
# to catch up it with active_sk. It might be fixed in https://databricks.atlassian.net/browse/LKB-946
# if we delete stale timelines before starting the migration.
# But the rest of the test is still valid: we should not lose committed WAL after the migration.
with pytest.raises(
StorageControllerApiException, match="not enough successful .* to reach quorum"
):
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, [other_sk.id]
)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["new_sk_set"] == [other_sk.id]
assert mconf["sk_set"] == [active_sk.id]
assert mconf["generation"] == 2
# Start the endpoint, so it advances the WAL on other_sk.
ep.start(safekeeper_generation=2, safekeepers=[active_sk.id, other_sk.id])
# Now the migration should succeed.
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, [other_sk.id]
)
# Check that we didn't lose committed WAL.
assert (
other_sk.http_client().timeline_status(env.initial_tenant, env.initial_timeline).flush_lsn
>= commit_lsn
)
assert ep.safe_psql("SELECT * FROM t") == [(0,), (1,)]
def test_pull_from_most_advanced_sk(neon_env_builder: NeonEnvBuilder):
"""
Test that we pull the timeline from the most advanced safekeeper during the
migration and do not lose committed WAL.
"""
neon_env_builder.num_safekeepers = 4
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
"timeline_safekeeper_count": 3,
}
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
sk_set = mconf["sk_set"]
assert len(sk_set) == 3
other_sk = [sk.id for sk in env.safekeepers if sk.id not in sk_set][0]
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
ep.start(safekeeper_generation=1, safekeepers=sk_set)
ep.safe_psql("CREATE TABLE t(a int)")
ep.safe_psql("INSERT INTO t VALUES (0)")
# Stop one sk, so we have a lagging WAL on it.
env.get_safekeeper(sk_set[0]).stop()
# Advance the WAL on the other sks.
ep.safe_psql("INSERT INTO t VALUES (1)")
# Stop other sks to make sure compute_ctl doesn't advance the last_log_term on them during shutdown.
for sk_id in sk_set[1:]:
env.get_safekeeper(sk_id).stop()
ep.stop(mode="immediate")
for sk_id in sk_set:
env.get_safekeeper(sk_id).start()
# Bump the term on the lagging sk to make sure we don't use it to choose the most advanced sk.
env.get_safekeeper(sk_set[0]).http_client().term_bump(
env.initial_tenant, env.initial_timeline, 100
)
def get_commit_lsn(sk_set: list[int]):
flush_lsns = []
last_log_terms = []
for sk_id in sk_set:
sk = env.get_safekeeper(sk_id)
status = sk.http_client().timeline_status(env.initial_tenant, env.initial_timeline)
flush_lsns.append(status.flush_lsn)
last_log_terms.append(status.last_log_term)
# In this test we assume that all sks have the same last_log_term.
assert len(set(last_log_terms)) == 1
flush_lsns.sort(reverse=True)
commit_lsn = flush_lsns[len(sk_set) // 2]
log.info(f"sk_set: {sk_set}, flush_lsns: {flush_lsns}, commit_lsn: {commit_lsn}")
return commit_lsn
commit_lsn_before_migration = get_commit_lsn(sk_set)
# Make two migrations, so the lagging sk stays in the sk_set, but other sks are replaced.
new_sk_set1 = [sk_set[0], sk_set[1], other_sk] # remove sk_set[2], add other_sk
new_sk_set2 = [sk_set[0], other_sk, sk_set[2]] # remove sk_set[1], add sk_set[2] back
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, new_sk_set1
)
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, new_sk_set2
)
commit_lsn_after_migration = get_commit_lsn(new_sk_set2)
# We should not lose committed WAL.
# If we have choosen the lagging sk to pull the timeline from, this might fail.
assert commit_lsn_before_migration <= commit_lsn_after_migration
ep.start(safekeeper_generation=5, safekeepers=new_sk_set2)
assert ep.safe_psql("SELECT * FROM t") == [(0,), (1,)]

View File

@@ -2757,18 +2757,37 @@ def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
remote_storage_kind = s3_storage()
neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind)
# Set a very small disk usage limit (1KB)
neon_env_builder.safekeeper_extra_opts = ["--max-timeline-disk-usage-bytes=1024"]
env = neon_env_builder.init_start()
# Create a timeline and endpoint
env.create_branch("test_timeline_disk_usage_limit")
endpoint = env.endpoints.create_start("test_timeline_disk_usage_limit")
endpoint = env.endpoints.create_start(
"test_timeline_disk_usage_limit",
config_lines=[
"neon.lakebase_mode=true",
],
)
# Install the neon extension in the test database. We need it to query perf counter metrics.
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE EXTENSION IF NOT EXISTS neon")
# Sanity-check safekeeper connection status in neon_perf_counters in the happy case.
cur.execute(
"SELECT value FROM neon_perf_counters WHERE metric = 'num_active_safekeepers'"
)
assert cur.fetchone() == (1,), "Expected 1 active safekeeper"
cur.execute(
"SELECT value FROM neon_perf_counters WHERE metric = 'num_configured_safekeepers'"
)
assert cur.fetchone() == (1,), "Expected 1 configured safekeeper"
# Get the safekeeper
sk = env.safekeepers[0]
# Restart the safekeeper with a very small disk usage limit (1KB)
sk.stop().start(["--max-timeline-disk-usage-bytes=1024"])
# Inject a failpoint to stop WAL backup
with sk.http_client() as http_cli:
http_cli.configure_failpoints([("backup-lsn-range-pausable", "pause")])
@@ -2794,6 +2813,18 @@ def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
wait_until(error_logged)
log.info("Found expected error message in compute log, resuming.")
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
# Confirm that neon_perf_counters also indicates that there are no active safekeepers
cur.execute(
"SELECT value FROM neon_perf_counters WHERE metric = 'num_active_safekeepers'"
)
assert cur.fetchone() == (0,), "Expected 0 active safekeepers"
cur.execute(
"SELECT value FROM neon_perf_counters WHERE metric = 'num_configured_safekeepers'"
)
assert cur.fetchone() == (1,), "Expected 1 configured safekeeper"
# Sanity check that the hanging insert is indeed still hanging. Otherwise means the circuit breaker we
# implemented didn't work as expected.
time.sleep(2)

View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.5",
"fa1788475e3146cc9c7c6a1b74f48fd296898fcd"
"1e01fcea2a6b38180021aa83e0051d95286d9096"
],
"v16": [
"16.9",
"9b9cb4b3e33347aea8f61e606bb6569979516de5"
"a42351fcd41ea01edede1daed65f651e838988fc"
],
"v15": [
"15.13",
"aaaeff2550d5deba58847f112af9b98fa3a58b00"
"2aaab3bb4a13557aae05bb2ae0ef0a132d0c4f85"
],
"v14": [
"14.18",
"c9f9fdd0113b52c0bd535afdb09d3a543aeee25f"
"2155cb165d05f617eb2c8ad7e43367189b627703"
]
}