Compare commits

..

10 Commits

Author SHA1 Message Date
Vlad Lazar
db95540975 pageserver: handle empty get vectored queries (#11652)
## Problem

If all batched requests are excluded from the query by
`Timeine::get_rel_page_at_lsn_batched` (e.g. because they are past the
end of the relation), the read path would panic since it doesn't expect
empty queries. This is a change in behaviour that was introduced with
the scattered query implementation.

## Summary of Changes

Handle empty queries explicitly.
2025-04-21 15:38:44 -04:00
JC Grünhage
90033fe693 fix(ci): set token for fast-forward failure comments and allow merging with state unstable (#11647)
## Problem

https://github.com/neondatabase/neon/actions/runs/14538136318/job/40790985693?pr=11645
failed, even though the relevant parts of the CI had passed and
auto-merge determined the PR is ready to merge. After that, commenting
failed.

## Summary of changes
- set GH_TOKEN for commenting after fast-forward failure
- allow merging with mergeable_state unstable
2025-04-21 15:38:44 -04:00
JC Grünhage
cb9d439cc1 fix(ci): make regex to find rc branches less strict (#11646)
## Problem

https://github.com/neondatabase/neon/actions/runs/14537161022/job/40787763965
failed to find the correct RC PR run, preventing artifact re-use. This
broke in https://github.com/neondatabase/neon/pull/11547.

There's a hotfix release containing this in
https://github.com/neondatabase/neon/pull/11645.

## Summary of changes
Make the regex for finding the RC PR run less strict, it was needlessly
precise.
2025-04-21 15:38:44 -04:00
Alex Chi Z.
5073e46df4 feat(pageserver): use rfc3339 time and print ratio in gc-compact stats (#11638)
## Problem

follow-up on https://github.com/neondatabase/neon/pull/11601

## Summary of changes

- serialize the start/end time using rfc3339 time string
- compute the size ratio of the compaction

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-18 05:28:01 +00:00
Alexander Bayandin
182bd95a4e CI(regress-tests): run tests on large-metal (#11634)
## Problem

Regression tests are more flaky on virtualised (`qemu-x64-*`) runners

See https://neondb.slack.com/archives/C069Z2199DL/p1744891865307769
Ref https://github.com/neondatabase/neon/issues/11627

## Summary of changes
- Switch `regress-tests` to metal-only large runners to mitigate flaky
behaviour
2025-04-18 01:25:38 +00:00
Anastasia Lubennikova
ce7795a67d compute: use project_id, endpoint_id as tag (#11556)
for compute audit logs

part of https://github.com/neondatabase/cloud/issues/21955
2025-04-17 23:32:38 +00:00
Suhas Thalanki
134d01c771 remove pg_anon.patch (#11636)
This PR removes `pg_anon.patch` as the `anon` v1 extension has been
removed and the patch is not being used anywhere
2025-04-17 22:08:16 +00:00
Arpad Müller
c1e4befd56 Additional fixes and improvements to storcon safekeeper timelines (#11477)
This delivers some additional fixes and improvements to storcon managed
safekeeper timelines:

* use `i32::MAX` for the generation number of timeline deletion
* start the generation for new timelines at 1 instead of 0: this ensures
that the other components actually are generation enabled
* fix database operations we use for metrics
* use join in list_pending_ops to prevent the classical ORM issue where
one does many db queries
* use enums in `test_storcon_create_delete_sk_down`. we are adding a
second parameter, and having two bool parameters is weird.
* extend `test_storcon_create_delete_sk_down` with a test of whole
tenant deletion. this hasn't been tested before.
* remove some redundant logging contexts
* Don't require mutable access to the service lock for scheduling
pending ops in memory. In order to pull this off, create reconcilers
eagerly. The advantage is that we don't need mutable access to the
service lock that way any more.

Part of #9011

---------

Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
2025-04-17 20:25:30 +00:00
a-masterov
6c2e5c044c random operations test (#10986)
## Problem
We need to test the stability of Neon.

## Summary of changes
The test runs random operations on a Neon project. It performs via the
Public API calls the following operations: `create a branch`, `delete a
branch`, `add a read-only endpoint`, `delete a read-only endpoint`,
`restore a branch to a random position in the past`. All the branches
and endpoints are loaded with `pgbench`.

---------

Co-authored-by: Peter Bendel <peterbendel@neon.tech>
Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2025-04-17 19:59:35 +00:00
Alex Chi Z.
748539b222 fix(pageserver): lower L0 compaction threshold (#11617)
## Problem

We saw OOMs due to L0 compaction happening simultaneously for all shards
of the same tenant right after the shard split.

## Summary of changes

Lower the threshold so that we compact fewer files.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-17 19:51:28 +00:00
39 changed files with 1114 additions and 1197 deletions

View File

@@ -349,7 +349,7 @@ jobs:
contents: read
statuses: write
needs: [ build-neon ]
runs-on: ${{ fromJSON(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'large-arm64' || 'large')) }}
runs-on: ${{ fromJSON(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'large-arm64' || 'large-metal')) }}
container:
image: ${{ inputs.build-tools-image }}
credentials:

View File

@@ -165,5 +165,5 @@ jobs:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CURRENT_SHA: ${{ github.sha }}
run: |
RELEASE_PR_RUN_ID=$(gh api "/repos/${GITHUB_REPOSITORY}/actions/runs?head_sha=$CURRENT_SHA" | jq '[.workflow_runs[] | select(.name == "Build and Test") | select(.head_branch | test("^rc/release(-(proxy|compute))?/[0-9]{4}-[0-9]{2}-[0-9]{2}$"; "s"))] | first | .id // ("Failed to find Build and Test run from RC PR!" | halt_error(1))')
RELEASE_PR_RUN_ID=$(gh api "/repos/${GITHUB_REPOSITORY}/actions/runs?head_sha=$CURRENT_SHA" | jq '[.workflow_runs[] | select(.name == "Build and Test") | select(.head_branch | test("^rc/release.*$"; "s"))] | first | .id // ("Failed to find Build and Test run from RC PR!" | halt_error(1))')
echo "release-pr-run-id=$RELEASE_PR_RUN_ID" | tee -a $GITHUB_OUTPUT

View File

@@ -27,15 +27,17 @@ jobs:
- name: Fast forwarding
uses: sequoia-pgp/fast-forward@ea7628bedcb0b0b96e94383ada458d812fca4979
# See https://docs.github.com/en/graphql/reference/enums#mergestatestatus
if: ${{ github.event.pull_request.mergeable_state == 'clean' }}
if: ${{ contains(fromJSON('["clean", "unstable"]'), github.event.pull_request.mergeable_state) }}
with:
merge: true
comment: on-error
github_token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Comment if mergeable_state is not clean
if: ${{ github.event.pull_request.mergeable_state != 'clean' }}
if: ${{ !contains(fromJSON('["clean", "unstable"]'), github.event.pull_request.mergeable_state) }}
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
gh pr comment ${{ github.event.pull_request.number }} \
--repo "${GITHUB_REPOSITORY}" \
--body "Not trying to forward pull-request, because \`mergeable_state\` is \`${{ github.event.pull_request.mergeable_state }}\`, not \`clean\`."
--body "Not trying to forward pull-request, because \`mergeable_state\` is \`${{ github.event.pull_request.mergeable_state }}\`, not \`clean\` or \`unstable\`."

93
.github/workflows/random-ops-test.yml vendored Normal file
View File

@@ -0,0 +1,93 @@
name: Random Operations Test
on:
schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '23 */2 * * *' # runs every 2 hours
workflow_dispatch:
inputs:
random_seed:
type: number
description: 'The random seed'
required: false
default: 0
num_operations:
type: number
description: "The number of operations to test"
default: 250
defaults:
run:
shell: bash -euxo pipefail {0}
permissions: {}
env:
DEFAULT_PG_VERSION: 16
PLATFORM: neon-captest-new
AWS_DEFAULT_REGION: eu-central-1
jobs:
run-random-rests:
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
runs-on: small
permissions:
id-token: write
statuses: write
strategy:
fail-fast: false
matrix:
pg-version: [16, 17]
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Run tests
uses: ./.github/actions/run-python-test-set
with:
build_type: remote
test_selection: random_ops
run_in_parallel: false
extra_params: -m remote_cluster
pg_version: ${{ matrix.pg-version }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
RANDOM_SEED: ${{ inputs.random_seed }}
NUM_OPERATIONS: ${{ inputs.num_operations }}
- name: Create Allure report
if: ${{ !cancelled() }}
id: create-allure-report
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}

14
Cargo.lock generated
View File

@@ -4251,7 +4251,6 @@ dependencies = [
"arc-swap",
"async-compression",
"async-stream",
"base64 0.13.1",
"bincode",
"bit_field",
"byteorder",
@@ -4299,7 +4298,6 @@ dependencies = [
"rand 0.8.5",
"range-set-blaze",
"regex",
"remote_keys",
"remote_storage",
"reqwest",
"rpds",
@@ -5505,16 +5503,6 @@ version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c707298afce11da2efef2f600116fa93ffa7a032b5d7b628aa17711ec81383ca"
[[package]]
name = "remote_keys"
version = "0.1.0"
dependencies = [
"anyhow",
"rand 0.8.5",
"utils",
"workspace_hack",
]
[[package]]
name = "remote_storage"
version = "0.1.0"
@@ -5530,7 +5518,6 @@ dependencies = [
"azure_identity",
"azure_storage",
"azure_storage_blobs",
"base64 0.13.1",
"bytes",
"camino",
"camino-tempfile",
@@ -5541,7 +5528,6 @@ dependencies = [
"humantime-serde",
"hyper 1.4.1",
"itertools 0.10.5",
"md5",
"metrics",
"once_cell",
"pin-project-lite",

View File

@@ -30,7 +30,6 @@ members = [
"libs/tenant_size_model",
"libs/metrics",
"libs/postgres_connection",
"libs/remote_keys",
"libs/remote_storage",
"libs/tracing-utils",
"libs/postgres_ffi/wal_craft",
@@ -256,7 +255,6 @@ postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }
postgres_initdb = { path = "./libs/postgres_initdb" }
pq_proto = { version = "0.1", path = "./libs/pq_proto/" }
remote_storage = { version = "0.1", path = "./libs/remote_storage/" }
remote_keys = { version = "0.1", path = "./libs/remote_keys/" }
safekeeper_api = { version = "0.1", path = "./libs/safekeeper_api" }
safekeeper_client = { path = "./safekeeper/client" }
desim = { version = "0.1", path = "./libs/desim" }

View File

@@ -1,265 +0,0 @@
commit 00aa659afc9c7336ab81036edec3017168aabf40
Author: Heikki Linnakangas <heikki@neon.tech>
Date: Tue Nov 12 16:59:19 2024 +0200
Temporarily disable test that depends on timezone
diff --git a/tests/expected/generalization.out b/tests/expected/generalization.out
index 23ef5fa..9e60deb 100644
--- a/ext-src/pg_anon-src/tests/expected/generalization.out
+++ b/ext-src/pg_anon-src/tests/expected/generalization.out
@@ -284,12 +284,9 @@ SELECT anon.generalize_tstzrange('19041107','century');
["Tue Jan 01 00:00:00 1901 PST","Mon Jan 01 00:00:00 2001 PST")
(1 row)
-SELECT anon.generalize_tstzrange('19041107','millennium');
- generalize_tstzrange
------------------------------------------------------------------
- ["Thu Jan 01 00:00:00 1001 PST","Mon Jan 01 00:00:00 2001 PST")
-(1 row)
-
+-- temporarily disabled, see:
+-- https://gitlab.com/dalibo/postgresql_anonymizer/-/commit/199f0a392b37c59d92ae441fb8f037e094a11a52#note_2148017485
+--SELECT anon.generalize_tstzrange('19041107','millennium');
-- generalize_daterange
SELECT anon.generalize_daterange('19041107');
generalize_daterange
diff --git a/tests/sql/generalization.sql b/tests/sql/generalization.sql
index b868344..b4fc977 100644
--- a/ext-src/pg_anon-src/tests/sql/generalization.sql
+++ b/ext-src/pg_anon-src/tests/sql/generalization.sql
@@ -61,7 +61,9 @@ SELECT anon.generalize_tstzrange('19041107','month');
SELECT anon.generalize_tstzrange('19041107','year');
SELECT anon.generalize_tstzrange('19041107','decade');
SELECT anon.generalize_tstzrange('19041107','century');
-SELECT anon.generalize_tstzrange('19041107','millennium');
+-- temporarily disabled, see:
+-- https://gitlab.com/dalibo/postgresql_anonymizer/-/commit/199f0a392b37c59d92ae441fb8f037e094a11a52#note_2148017485
+--SELECT anon.generalize_tstzrange('19041107','millennium');
-- generalize_daterange
SELECT anon.generalize_daterange('19041107');
commit 7dd414ee75f2875cffb1d6ba474df1f135a6fc6f
Author: Alexey Masterov <alexeymasterov@neon.tech>
Date: Fri May 31 06:34:26 2024 +0000
These alternative expected files were added to consider the neon features
diff --git a/ext-src/pg_anon-src/tests/expected/permissions_masked_role_1.out b/ext-src/pg_anon-src/tests/expected/permissions_masked_role_1.out
new file mode 100644
index 0000000..2539cfd
--- /dev/null
+++ b/ext-src/pg_anon-src/tests/expected/permissions_masked_role_1.out
@@ -0,0 +1,101 @@
+BEGIN;
+CREATE EXTENSION anon CASCADE;
+NOTICE: installing required extension "pgcrypto"
+SELECT anon.init();
+ init
+------
+ t
+(1 row)
+
+CREATE ROLE mallory_the_masked_user;
+SECURITY LABEL FOR anon ON ROLE mallory_the_masked_user IS 'MASKED';
+CREATE TABLE t1(i INT);
+ALTER TABLE t1 ADD COLUMN t TEXT;
+SECURITY LABEL FOR anon ON COLUMN t1.t
+IS 'MASKED WITH VALUE NULL';
+INSERT INTO t1 VALUES (1,'test');
+--
+-- We're checking the owner's permissions
+--
+-- see
+-- https://postgresql-anonymizer.readthedocs.io/en/latest/SECURITY/#permissions
+--
+SET ROLE mallory_the_masked_user;
+SELECT anon.pseudo_first_name(0) IS NOT NULL;
+ ?column?
+----------
+ t
+(1 row)
+
+-- SHOULD FAIL
+DO $$
+BEGIN
+ PERFORM anon.init();
+ EXCEPTION WHEN insufficient_privilege
+ THEN RAISE NOTICE 'insufficient_privilege';
+END$$;
+NOTICE: insufficient_privilege
+-- SHOULD FAIL
+DO $$
+BEGIN
+ PERFORM anon.anonymize_table('t1');
+ EXCEPTION WHEN insufficient_privilege
+ THEN RAISE NOTICE 'insufficient_privilege';
+END$$;
+NOTICE: insufficient_privilege
+-- SHOULD FAIL
+SAVEPOINT fail_start_engine;
+SELECT anon.start_dynamic_masking();
+ERROR: Only supersusers can start the dynamic masking engine.
+CONTEXT: PL/pgSQL function anon.start_dynamic_masking(boolean) line 18 at RAISE
+ROLLBACK TO fail_start_engine;
+RESET ROLE;
+SELECT anon.start_dynamic_masking();
+ start_dynamic_masking
+-----------------------
+ t
+(1 row)
+
+SET ROLE mallory_the_masked_user;
+SELECT * FROM mask.t1;
+ i | t
+---+---
+ 1 |
+(1 row)
+
+-- SHOULD FAIL
+DO $$
+BEGIN
+ SELECT * FROM public.t1;
+ EXCEPTION WHEN insufficient_privilege
+ THEN RAISE NOTICE 'insufficient_privilege';
+END$$;
+NOTICE: insufficient_privilege
+-- SHOULD FAIL
+SAVEPOINT fail_stop_engine;
+SELECT anon.stop_dynamic_masking();
+ERROR: Only supersusers can stop the dynamic masking engine.
+CONTEXT: PL/pgSQL function anon.stop_dynamic_masking() line 18 at RAISE
+ROLLBACK TO fail_stop_engine;
+RESET ROLE;
+SELECT anon.stop_dynamic_masking();
+NOTICE: The previous priviledges of 'mallory_the_masked_user' are not restored. You need to grant them manually.
+ stop_dynamic_masking
+----------------------
+ t
+(1 row)
+
+SET ROLE mallory_the_masked_user;
+SELECT COUNT(*)=1 FROM anon.pg_masking_rules;
+ ?column?
+----------
+ t
+(1 row)
+
+-- SHOULD FAIL
+SAVEPOINT fail_seclabel_on_role;
+SECURITY LABEL FOR anon ON ROLE mallory_the_masked_user IS NULL;
+ERROR: permission denied
+DETAIL: The current user must have the CREATEROLE attribute.
+ROLLBACK TO fail_seclabel_on_role;
+ROLLBACK;
diff --git a/ext-src/pg_anon-src/tests/expected/permissions_owner_1.out b/ext-src/pg_anon-src/tests/expected/permissions_owner_1.out
new file mode 100644
index 0000000..8b090fe
--- /dev/null
+++ b/ext-src/pg_anon-src/tests/expected/permissions_owner_1.out
@@ -0,0 +1,104 @@
+BEGIN;
+CREATE EXTENSION anon CASCADE;
+NOTICE: installing required extension "pgcrypto"
+SELECT anon.init();
+ init
+------
+ t
+(1 row)
+
+CREATE ROLE oscar_the_owner;
+ALTER DATABASE :DBNAME OWNER TO oscar_the_owner;
+CREATE ROLE mallory_the_masked_user;
+SECURITY LABEL FOR anon ON ROLE mallory_the_masked_user IS 'MASKED';
+--
+-- We're checking the owner's permissions
+--
+-- see
+-- https://postgresql-anonymizer.readthedocs.io/en/latest/SECURITY/#permissions
+--
+SET ROLE oscar_the_owner;
+SELECT anon.pseudo_first_name(0) IS NOT NULL;
+ ?column?
+----------
+ t
+(1 row)
+
+-- SHOULD FAIL
+DO $$
+BEGIN
+ PERFORM anon.init();
+ EXCEPTION WHEN insufficient_privilege
+ THEN RAISE NOTICE 'insufficient_privilege';
+END$$;
+NOTICE: insufficient_privilege
+CREATE TABLE t1(i INT);
+ALTER TABLE t1 ADD COLUMN t TEXT;
+SECURITY LABEL FOR anon ON COLUMN t1.t
+IS 'MASKED WITH VALUE NULL';
+INSERT INTO t1 VALUES (1,'test');
+SELECT anon.anonymize_table('t1');
+ anonymize_table
+-----------------
+ t
+(1 row)
+
+SELECT * FROM t1;
+ i | t
+---+---
+ 1 |
+(1 row)
+
+UPDATE t1 SET t='test' WHERE i=1;
+-- SHOULD FAIL
+SAVEPOINT fail_start_engine;
+SELECT anon.start_dynamic_masking();
+ start_dynamic_masking
+-----------------------
+ t
+(1 row)
+
+ROLLBACK TO fail_start_engine;
+RESET ROLE;
+SELECT anon.start_dynamic_masking();
+ start_dynamic_masking
+-----------------------
+ t
+(1 row)
+
+SET ROLE oscar_the_owner;
+SELECT * FROM t1;
+ i | t
+---+------
+ 1 | test
+(1 row)
+
+--SELECT * FROM mask.t1;
+-- SHOULD FAIL
+SAVEPOINT fail_stop_engine;
+SELECT anon.stop_dynamic_masking();
+ERROR: permission denied for schema mask
+CONTEXT: SQL statement "DROP VIEW mask.t1;"
+PL/pgSQL function anon.mask_drop_view(oid) line 3 at EXECUTE
+SQL statement "SELECT anon.mask_drop_view(oid)
+ FROM pg_catalog.pg_class
+ WHERE relnamespace=quote_ident(pg_catalog.current_setting('anon.sourceschema'))::REGNAMESPACE
+ AND relkind IN ('r','p','f')"
+PL/pgSQL function anon.stop_dynamic_masking() line 22 at PERFORM
+ROLLBACK TO fail_stop_engine;
+RESET ROLE;
+SELECT anon.stop_dynamic_masking();
+NOTICE: The previous priviledges of 'mallory_the_masked_user' are not restored. You need to grant them manually.
+ stop_dynamic_masking
+----------------------
+ t
+(1 row)
+
+SET ROLE oscar_the_owner;
+-- SHOULD FAIL
+SAVEPOINT fail_seclabel_on_role;
+SECURITY LABEL FOR anon ON ROLE mallory_the_masked_user IS NULL;
+ERROR: permission denied
+DETAIL: The current user must have the CREATEROLE attribute.
+ROLLBACK TO fail_seclabel_on_role;
+ROLLBACK;

View File

@@ -641,7 +641,26 @@ impl ComputeNode {
let log_directory_path = Path::new(&self.params.pgdata).join("log");
let log_directory_path = log_directory_path.to_string_lossy().to_string();
configure_audit_rsyslog(log_directory_path.clone(), "hipaa", &remote_endpoint)?;
// Add project_id,endpoint_id tag to identify the logs.
//
// These ids are passed from cplane,
// for backwards compatibility (old computes that don't have them),
// we set them to None.
// TODO: Clean up this code when all computes have them.
let tag: Option<String> = match (
pspec.spec.project_id.as_deref(),
pspec.spec.endpoint_id.as_deref(),
) {
(Some(project_id), Some(endpoint_id)) => {
Some(format!("{project_id}/{endpoint_id}"))
}
(Some(project_id), None) => Some(format!("{project_id}/None")),
(None, Some(endpoint_id)) => Some(format!("None,{endpoint_id}")),
(None, None) => None,
};
configure_audit_rsyslog(log_directory_path.clone(), tag, &remote_endpoint)?;
// Launch a background task to clean up the audit logs
launch_pgaudit_gc(log_directory_path);

View File

@@ -50,13 +50,13 @@ fn restart_rsyslog() -> Result<()> {
pub fn configure_audit_rsyslog(
log_directory: String,
tag: &str,
tag: Option<String>,
remote_endpoint: &str,
) -> Result<()> {
let config_content: String = format!(
include_str!("config_template/compute_audit_rsyslog_template.conf"),
log_directory = log_directory,
tag = tag,
tag = tag.unwrap_or("".to_string()),
remote_endpoint = remote_endpoint
);

View File

@@ -682,10 +682,10 @@ pub mod tenant_conf_defaults {
pub const DEFAULT_COMPACTION_SHARD_ANCESTOR: bool = true;
// This value needs to be tuned to avoid OOM. We have 3/4*CPUs threads for L0 compaction, that's
// 3/4*16=9 on most of our pageservers. Compacting 20 layers requires about 1 GB memory (could
// be reduced later by optimizing L0 hole calculation to avoid loading all keys into memory). So
// with this config, we can get a maximum peak compaction usage of 9 GB.
pub const DEFAULT_COMPACTION_UPPER_LIMIT: usize = 20;
// 3/4*8=6 on most of our pageservers. Compacting 10 layers requires a maximum of
// DEFAULT_CHECKPOINT_DISTANCE*10 memory, that's 2560MB. So with this config, we can get a maximum peak
// compaction usage of 15360MB.
pub const DEFAULT_COMPACTION_UPPER_LIMIT: usize = 10;
// Enable L0 compaction pass and semaphore by default. L0 compaction must be responsive to avoid
// read amp.
pub const DEFAULT_COMPACTION_L0_FIRST: bool = true;
@@ -702,8 +702,11 @@ pub mod tenant_conf_defaults {
// Relevant: https://github.com/neondatabase/neon/issues/3394
pub const DEFAULT_GC_PERIOD: &str = "1 hr";
pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3;
// If there are more than threshold * compaction_threshold (that is 3 * 10 in the default config) L0 layers, image
// layer creation will end immediately. Set to 0 to disable.
// Currently, any value other than 0 will trigger image layer creation preemption immediately with L0 backpressure
// without looking at the exact number of L0 layers.
// It was expected to have the following behavior:
// > If there are more than threshold * compaction_threshold (that is 3 * 10 in the default config) L0 layers, image
// > layer creation will end immediately. Set to 0 to disable.
pub const DEFAULT_IMAGE_CREATION_PREEMPT_THRESHOLD: usize = 3;
pub const DEFAULT_PITR_INTERVAL: &str = "7 days";
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "10 seconds";

View File

@@ -1,13 +0,0 @@
[package]
name = "remote_keys"
version = "0.1.0"
edition = "2024"
license.workspace = true
[dependencies]
anyhow.workspace = true
utils.workspace = true
workspace_hack.workspace = true
[dev-dependencies]
rand.workspace = true

View File

@@ -1,42 +0,0 @@
//! A module that provides a KMS implementation that generates and unwraps the keys.
//!
/// A KMS implementation that does static wrapping and unwrapping of the keys.
pub struct NaiveKms {
account_id: String,
}
impl NaiveKms {
pub fn new(account_id: String) -> Self {
Self { account_id }
}
pub fn encrypt(&self, plain: &[u8]) -> anyhow::Result<Vec<u8>> {
let wrapped = [self.account_id.as_bytes(), "-wrapped-".as_bytes(), plain].concat();
Ok(wrapped)
}
pub fn decrypt(&self, wrapped: &[u8]) -> anyhow::Result<Vec<u8>> {
let Some(wrapped) = wrapped.strip_prefix(self.account_id.as_bytes()) else {
return Err(anyhow::anyhow!("invalid key"));
};
let Some(plain) = wrapped.strip_prefix(b"-wrapped-") else {
return Err(anyhow::anyhow!("invalid key"));
};
Ok(plain.to_vec())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_generate_key() {
let kms = NaiveKms::new("test-tenant".to_string());
let data = rand::random::<[u8; 32]>().to_vec();
let encrypted = kms.encrypt(&data).unwrap();
let decrypted = kms.decrypt(&encrypted).unwrap();
assert_eq!(data, decrypted);
}
}

View File

@@ -13,7 +13,6 @@ aws-smithy-async.workspace = true
aws-smithy-types.workspace = true
aws-config.workspace = true
aws-sdk-s3.workspace = true
base64.workspace = true
bytes.workspace = true
camino = { workspace = true, features = ["serde1"] }
humantime-serde.workspace = true
@@ -28,7 +27,6 @@ tokio-util = { workspace = true, features = ["compat"] }
toml_edit.workspace = true
tracing.workspace = true
scopeguard.workspace = true
md5.workspace = true
metrics.workspace = true
utils = { path = "../utils", default-features = false }
pin-project-lite.workspace = true

View File

@@ -550,19 +550,6 @@ impl RemoteStorage for AzureBlobStorage {
self.download_for_builder(builder, timeout, cancel).await
}
#[allow(unused_variables)]
async fn upload_with_encryption(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
unimplemented!()
}
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
self.delete_objects(std::array::from_ref(path), cancel)
.await

View File

@@ -190,8 +190,6 @@ pub struct DownloadOpts {
/// timeouts: for something like an index/manifest/heatmap, we should time out faster than
/// for layer files
pub kind: DownloadKind,
/// The encryption key to use for the download.
pub encryption_key: Option<Vec<u8>>,
}
pub enum DownloadKind {
@@ -206,7 +204,6 @@ impl Default for DownloadOpts {
byte_start: Bound::Unbounded,
byte_end: Bound::Unbounded,
kind: DownloadKind::Large,
encryption_key: None,
}
}
}
@@ -244,15 +241,6 @@ impl DownloadOpts {
None => format!("bytes={start}-"),
})
}
pub fn with_encryption_key(mut self, encryption_key: Option<impl AsRef<[u8]>>) -> Self {
self.encryption_key = encryption_key.map(|k| k.as_ref().to_vec());
self
}
pub fn encryption_key(&self) -> Option<&[u8]> {
self.encryption_key.as_deref()
}
}
/// Storage (potentially remote) API to manage its state.
@@ -343,19 +331,6 @@ pub trait RemoteStorage: Send + Sync + 'static {
cancel: &CancellationToken,
) -> Result<Download, DownloadError>;
/// Same as upload, but with remote encryption if the backend supports it (e.g. SSE-C on AWS).
async fn upload_with_encryption(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
// S3 PUT request requires the content length to be specified,
// otherwise it starts to fail with the concurrent connection count increasing.
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> anyhow::Result<()>;
/// Delete a single path from remote storage.
///
/// If the operation fails because of timeout or cancellation, the root cause of the error will be
@@ -640,63 +615,6 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
}
}
}
pub async fn upload_with_encryption(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
match self {
Self::LocalFs(s) => {
s.upload_with_encryption(
from,
data_size_bytes,
to,
metadata,
encryption_key,
cancel,
)
.await
}
Self::AwsS3(s) => {
s.upload_with_encryption(
from,
data_size_bytes,
to,
metadata,
encryption_key,
cancel,
)
.await
}
Self::AzureBlob(s) => {
s.upload_with_encryption(
from,
data_size_bytes,
to,
metadata,
encryption_key,
cancel,
)
.await
}
Self::Unreliable(s) => {
s.upload_with_encryption(
from,
data_size_bytes,
to,
metadata,
encryption_key,
cancel,
)
.await
}
}
}
}
impl GenericRemoteStorage {

View File

@@ -198,10 +198,6 @@ impl LocalFs {
let mut entries = cur_folder.read_dir_utf8()?;
while let Some(Ok(entry)) = entries.next() {
let file_name = entry.file_name();
if file_name.ends_with(".metadata") || file_name.ends_with(".enc") {
// ignore metadata and encryption key files
continue;
}
let full_file_name = cur_folder.join(file_name);
if full_file_name.as_str().starts_with(prefix) {
let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
@@ -222,7 +218,6 @@ impl LocalFs {
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
enctyption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let target_file_path = to.with_base(&self.storage_root);
@@ -311,8 +306,6 @@ impl LocalFs {
)
})?;
// TODO: we might need to make the following writes atomic with the file write operation above
if let Some(storage_metadata) = metadata {
// FIXME: we must not be using metadata much, since this would forget the old metadata
// for new writes? or perhaps metadata is sticky; could consider removing if it's never
@@ -331,15 +324,6 @@ impl LocalFs {
})?;
}
if let Some(encryption_key) = enctyption_key {
let encryption_key_path = storage_encryption_key_path(&target_file_path);
fs::write(&encryption_key_path, encryption_key).await.with_context(|| {
format!(
"Failed to write encryption key to the local storage at '{encryption_key_path}'",
)
})?;
}
Ok(())
}
}
@@ -466,7 +450,6 @@ impl RemoteStorage for LocalFs {
key: &RemotePath,
_cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError> {
// TODO: check encryption key
let target_file_path = key.with_base(&self.storage_root);
let metadata = file_metadata(&target_file_path).await?;
Ok(ListingObject {
@@ -478,14 +461,34 @@ impl RemoteStorage for LocalFs {
async fn upload(
&self,
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
self.upload_with_encryption(data, data_size_bytes, to, metadata, None, cancel)
.await
let cancel = cancel.child_token();
let op = self.upload0(data, data_size_bytes, to, metadata, &cancel);
let mut op = std::pin::pin!(op);
// race the upload0 to the timeout; if it goes over, do a graceful shutdown
let (res, timeout) = tokio::select! {
res = &mut op => (res, false),
_ = tokio::time::sleep(self.timeout) => {
cancel.cancel();
(op.await, true)
}
};
match res {
Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => {
// we caused this cancel (or they happened simultaneously) -- swap it out to
// Timeout
Err(TimeoutOrCancel::Timeout.into())
}
res => res,
}
}
async fn download(
@@ -503,22 +506,6 @@ impl RemoteStorage for LocalFs {
return Err(DownloadError::Unmodified);
}
let key = match fs::read(storage_encryption_key_path(&target_path)).await {
Ok(key) => Some(key),
Err(e) if e.kind() == ErrorKind::NotFound => None,
Err(e) => {
return Err(DownloadError::Other(
anyhow::anyhow!(e).context("cannot read encryption key"),
));
}
};
if key != opts.encryption_key {
return Err(DownloadError::Other(anyhow::anyhow!(
"encryption key mismatch"
)));
}
let mut file = fs::OpenOptions::new()
.read(true)
.open(&target_path)
@@ -564,53 +551,12 @@ impl RemoteStorage for LocalFs {
async fn delete(&self, path: &RemotePath, _cancel: &CancellationToken) -> anyhow::Result<()> {
let file_path = path.with_base(&self.storage_root);
match fs::remove_file(&file_path).await {
Ok(()) => {}
Ok(()) => Ok(()),
// The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
// See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
// > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
Err(e) if e.kind() == ErrorKind::NotFound => {}
Err(e) => return Err(anyhow::anyhow!(e)),
};
fs::remove_file(&storage_metadata_path(&file_path))
.await
.ok();
fs::remove_file(&storage_encryption_key_path(&file_path))
.await
.ok();
Ok(())
}
#[allow(unused_variables)]
async fn upload_with_encryption(
&self,
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let cancel = cancel.child_token();
let op = self.upload0(data, data_size_bytes, to, metadata, encryption_key, &cancel);
let mut op = std::pin::pin!(op);
// race the upload0 to the timeout; if it goes over, do a graceful shutdown
let (res, timeout) = tokio::select! {
res = &mut op => (res, false),
_ = tokio::time::sleep(self.timeout) => {
cancel.cancel();
(op.await, true)
}
};
match res {
Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => {
// we caused this cancel (or they happened simultaneously) -- swap it out to
// Timeout
Err(TimeoutOrCancel::Timeout.into())
}
res => res,
Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
Err(e) => Err(anyhow::anyhow!(e)),
}
}
@@ -645,7 +591,6 @@ impl RemoteStorage for LocalFs {
to_path = to_path
)
})?;
// TODO: copy metadata and encryption key
Ok(())
}
@@ -664,10 +609,6 @@ fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
path_with_suffix_extension(original_path, "metadata")
}
fn storage_encryption_key_path(original_path: &Utf8Path) -> Utf8PathBuf {
path_with_suffix_extension(original_path, "enc")
}
async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> {
let target_dir = match target_file_path.parent() {
Some(parent_dir) => parent_dir,

View File

@@ -66,10 +66,7 @@ struct GetObjectRequest {
key: String,
etag: Option<String>,
range: Option<String>,
/// Base64 encoded SSE-C key for server-side encryption.
sse_c_key: Option<Vec<u8>>,
}
impl S3Bucket {
/// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
@@ -260,13 +257,6 @@ impl S3Bucket {
builder = builder.if_none_match(etag);
}
if let Some(encryption_key) = request.sse_c_key {
builder = builder.sse_customer_algorithm("AES256");
builder = builder.sse_customer_key(base64::encode(&encryption_key));
builder = builder
.sse_customer_key_md5(base64::encode(md5::compute(&encryption_key).as_slice()));
}
let get_object = builder.send();
let get_object = tokio::select! {
@@ -703,13 +693,12 @@ impl RemoteStorage for S3Bucket {
})
}
async fn upload_with_encryption(
async fn upload(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
from_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let kind = RequestKind::Put;
@@ -720,7 +709,7 @@ impl RemoteStorage for S3Bucket {
let body = StreamBody::new(from.map(|x| x.map(Frame::data)));
let bytes_stream = ByteStream::new(SdkBody::from_body_1_x(body));
let mut upload = self
let upload = self
.client
.put_object()
.bucket(self.bucket_name.clone())
@@ -728,17 +717,8 @@ impl RemoteStorage for S3Bucket {
.set_metadata(metadata.map(|m| m.0))
.set_storage_class(self.upload_storage_class.clone())
.content_length(from_size_bytes.try_into()?)
.body(bytes_stream);
if let Some(encryption_key) = encryption_key {
upload = upload.sse_customer_algorithm("AES256");
let base64_key = base64::encode(encryption_key);
upload = upload.sse_customer_key(&base64_key);
upload = upload
.sse_customer_key_md5(base64::encode(md5::compute(encryption_key).as_slice()));
}
let upload = upload.send();
.body(bytes_stream)
.send();
let upload = tokio::time::timeout(self.timeout, upload);
@@ -762,18 +742,6 @@ impl RemoteStorage for S3Bucket {
}
}
async fn upload(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
self.upload_with_encryption(from, data_size_bytes, to, metadata, None, cancel)
.await
}
async fn copy(
&self,
from: &RemotePath,
@@ -833,7 +801,6 @@ impl RemoteStorage for S3Bucket {
key: self.relative_path_to_s3_object(from),
etag: opts.etag.as_ref().map(|e| e.to_string()),
range: opts.byte_range_header(),
sse_c_key: opts.encryption_key.clone(),
},
cancel,
)

View File

@@ -178,19 +178,6 @@ impl RemoteStorage for UnreliableWrapper {
self.inner.download(from, opts, cancel).await
}
#[allow(unused_variables)]
async fn upload_with_encryption(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
encryption_key: Option<&[u8]>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
unimplemented!()
}
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
self.delete_inner(path, true, cancel).await
}

View File

@@ -421,7 +421,7 @@ async fn download_is_timeouted(ctx: &mut MaybeEnabledStorage) {
))
.unwrap();
let len = upload_large_enough_file(&ctx.client, &path, &cancel, None).await;
let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
let timeout = std::time::Duration::from_secs(5);
@@ -500,7 +500,7 @@ async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
))
.unwrap();
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel, None).await;
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
{
let stream = ctx
@@ -555,7 +555,6 @@ async fn upload_large_enough_file(
client: &GenericRemoteStorage,
path: &RemotePath,
cancel: &CancellationToken,
encryption_key: Option<&[u8]>,
) -> usize {
let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
let body = bytes::Bytes::from(vec![0u8; 1024]);
@@ -566,54 +565,9 @@ async fn upload_large_enough_file(
let contents = futures::stream::iter(contents.map(std::io::Result::Ok));
client
.upload_with_encryption(contents, len, path, None, encryption_key, cancel)
.upload(contents, len, path, None, cancel)
.await
.expect("upload succeeds");
len
}
#[test_context(MaybeEnabledStorage)]
#[tokio::test]
async fn encryption_works(ctx: &mut MaybeEnabledStorage) {
let MaybeEnabledStorage::Enabled(ctx) = ctx else {
return;
};
let cancel = CancellationToken::new();
let path = RemotePath::new(Utf8Path::new(
format!("{}/file_to_copy", ctx.base_prefix).as_str(),
))
.unwrap();
let key = rand::random::<[u8; 32]>();
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel, Some(&key)).await;
{
let download = ctx
.client
.download(
&path,
&DownloadOpts::default().with_encryption_key(Some(&key)),
&cancel,
)
.await
.expect("should succeed");
let vec = download_to_vec(download).await.expect("should succeed");
assert_eq!(vec.len(), file_len);
}
{
// Download without encryption key should fail
let download = ctx
.client
.download(&path, &DownloadOpts::default(), &cancel)
.await;
assert!(download.is_err());
}
let cancel = CancellationToken::new();
ctx.client.delete_objects(&[path], &cancel).await.unwrap();
}

View File

@@ -17,7 +17,6 @@ anyhow.workspace = true
arc-swap.workspace = true
async-compression.workspace = true
async-stream.workspace = true
base64.workspace = true
bit_field.workspace = true
bincode.workspace = true
byteorder.workspace = true
@@ -83,7 +82,6 @@ postgres_connection.workspace = true
postgres_ffi.workspace = true
pq_proto.workspace = true
remote_storage.workspace = true
remote_keys.workspace = true
storage_broker.workspace = true
tenant_size_model.workspace = true
http-utils.workspace = true

View File

@@ -45,7 +45,6 @@ fn bench_upload_queue_next_ready(c: &mut Criterion) {
shard: ShardIndex::new(ShardNumber(1), ShardCount(2)),
generation: Generation::Valid(1),
file_size: 0,
encryption_key: None,
};
// Construct the (initial and uploaded) index with layer0.

View File

@@ -192,12 +192,11 @@ pub(crate) use download::{
download_index_part, download_initdb_tar_zst, download_tenant_manifest, is_temp_download_file,
list_remote_tenant_shards, list_remote_timelines,
};
use index::GcCompactionState;
pub(crate) use index::LayerFileMetadata;
use index::{EncryptionKey, EncryptionKeyId, EncryptionKeyPair, GcCompactionState, KeyVersion};
use pageserver_api::models::{RelSizeMigration, TimelineArchivalState, TimelineVisibilityState};
use pageserver_api::shard::{ShardIndex, TenantShardId};
use regex::Regex;
use remote_keys::NaiveKms;
use remote_storage::{
DownloadError, GenericRemoteStorage, ListingMode, RemotePath, TimeoutOrCancel,
};
@@ -368,10 +367,6 @@ pub(crate) struct RemoteTimelineClient {
config: std::sync::RwLock<RemoteTimelineClientConfig>,
cancel: CancellationToken,
kms_impl: Option<NaiveKms>,
key_repo: std::sync::Mutex<HashMap<EncryptionKeyId, EncryptionKeyPair>>,
}
impl Drop for RemoteTimelineClient {
@@ -416,9 +411,6 @@ impl RemoteTimelineClient {
)),
config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(location_conf)),
cancel: CancellationToken::new(),
// TODO: make this configurable
kms_impl: Some(NaiveKms::new(tenant_shard_id.tenant_id.to_string())),
key_repo: std::sync::Mutex::new(HashMap::new()),
}
}
@@ -735,43 +727,9 @@ impl RemoteTimelineClient {
reason: "no need for a downloads gauge",
},
);
let key_pair = if let Some(ref key_id) = layer_metadata.encryption_key {
let wrapped_key = {
let mut queue = self.upload_queue.lock().unwrap();
let upload_queue = queue.initialized_mut().unwrap();
let encryption_key_pair =
upload_queue.dirty.keys.iter().find(|key| &key.id == key_id);
if let Some(encryption_key_pair) = encryption_key_pair {
// TODO: also check if we have uploaded the key yet; we should never use a key that is not persisted
encryption_key_pair.clone()
} else {
return Err(DownloadError::Other(anyhow::anyhow!(
"Encryption key pair not found in index_part.json"
)));
}
};
let Some(kms) = self.kms_impl.as_ref() else {
return Err(DownloadError::Other(anyhow::anyhow!(
"KMS not configured when downloading encrypted layer file"
)));
};
let plain_key = kms
.decrypt(&wrapped_key.key)
.context("failed to decrypt encryption key")
.map_err(DownloadError::Other)?;
Some(EncryptionKeyPair::new(
wrapped_key.id,
plain_key,
wrapped_key.key,
))
} else {
None
};
download::download_layer_file(
self.conf,
&self.storage_impl,
key_pair.as_ref(),
self.tenant_shard_id,
self.timeline_id,
layer_file_name,
@@ -1292,14 +1250,6 @@ impl RemoteTimelineClient {
upload_queue: &mut UploadQueueInitialized,
layer: ResidentLayer,
) {
let key_pair = {
if let Some(key_id) = layer.metadata().encryption_key {
let guard = self.key_repo.lock().unwrap();
Some(guard.get(&key_id).cloned().unwrap())
} else {
None
}
};
let metadata = layer.metadata();
upload_queue
@@ -1314,7 +1264,7 @@ impl RemoteTimelineClient {
"scheduled layer file upload {layer}",
);
let op = UploadOp::UploadLayer(layer, metadata, key_pair, None);
let op = UploadOp::UploadLayer(layer, metadata, None);
self.metric_begin(&op);
upload_queue.queued_operations.push_back(op);
}
@@ -1496,58 +1446,6 @@ impl RemoteTimelineClient {
upload_queue.queued_operations.push_back(op);
}
#[allow(dead_code)]
fn is_kms_enabled(&self) -> bool {
self.kms_impl.is_some()
}
pub(crate) fn schedule_generate_encryption_key(
self: &Arc<Self>,
) -> Result<Option<EncryptionKeyPair>, NotInitialized> {
let Some(kms_impl) = self.kms_impl.as_ref() else {
return Ok(None);
};
let plain_key = rand::random::<[u8; 32]>().to_vec(); // StdRng is cryptographically secure (?)
let wrapped_key = kms_impl.encrypt(&plain_key).unwrap();
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
let last_key = upload_queue.dirty.keys.last();
let this_key_version = if let Some(last_key) = last_key {
let key_version = EncryptionKeyId {
version: last_key.id.version.next(),
generation: self.generation,
};
assert!(key_version > last_key.id); // ensure key version is strictly increasing; no dup key versions
key_version
} else {
EncryptionKeyId {
version: KeyVersion(1),
generation: self.generation,
}
};
let key_pair = EncryptionKeyPair {
id: this_key_version.clone(),
plain_key: plain_key.clone(),
wrapped_key,
};
upload_queue.dirty.keys.push(EncryptionKey {
key: plain_key,
id: this_key_version,
created_at: Utc::now().naive_utc(),
});
self.key_repo.lock().unwrap().insert(this_key_version, key_pair);
self.schedule_index_upload(upload_queue);
Ok(Some(key_pair))
}
/// Schedules a compaction update to the remote `index_part.json`.
///
/// `compacted_from` represent the L0 names which have been `compacted_to` L1 layers.
@@ -1556,7 +1454,6 @@ impl RemoteTimelineClient {
compacted_from: &[Layer],
compacted_to: &[ResidentLayer],
) -> Result<(), NotInitialized> {
// Use the same key for all layers in a single compaction job
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
@@ -1818,7 +1715,6 @@ impl RemoteTimelineClient {
uploaded.local_path(),
&remote_path,
uploaded.metadata().file_size,
None, // TODO(chi): support encryption for those layer files uploaded using this interface
cancel,
)
.await
@@ -1861,8 +1757,6 @@ impl RemoteTimelineClient {
adopted_as.metadata().generation,
);
// TODO: support encryption for those layer files uploaded using this interface
backoff::retry(
|| async {
upload::copy_timeline_layer(
@@ -2083,7 +1977,7 @@ impl RemoteTimelineClient {
// Prepare upload.
match &mut next_op {
UploadOp::UploadLayer(layer, meta, _, mode) => {
UploadOp::UploadLayer(layer, meta, mode) => {
if upload_queue
.recently_deleted
.remove(&(layer.layer_desc().layer_name().clone(), meta.generation))
@@ -2177,7 +2071,7 @@ impl RemoteTimelineClient {
// Assert that we don't modify a layer that's referenced by the current index.
if cfg!(debug_assertions) {
let modified = match &task.op {
UploadOp::UploadLayer(layer, layer_metadata, _, _) => {
UploadOp::UploadLayer(layer, layer_metadata, _) => {
vec![(layer.layer_desc().layer_name(), layer_metadata)]
}
UploadOp::Delete(delete) => {
@@ -2199,7 +2093,7 @@ impl RemoteTimelineClient {
}
let upload_result: anyhow::Result<()> = match &task.op {
UploadOp::UploadLayer(layer, layer_metadata, encryption_key_pair, mode) => {
UploadOp::UploadLayer(layer, layer_metadata, mode) => {
// TODO: check if this mechanism can be removed now that can_bypass() performs
// conflict checks during scheduling.
if let Some(OpType::FlushDeletion) = mode {
@@ -2280,7 +2174,6 @@ impl RemoteTimelineClient {
local_path,
&remote_path,
layer_metadata.file_size,
encryption_key_pair.clone(),
&self.cancel,
)
.measure_remote_op(
@@ -2431,7 +2324,7 @@ impl RemoteTimelineClient {
upload_queue.inprogress_tasks.remove(&task.task_id);
let lsn_update = match task.op {
UploadOp::UploadLayer(_, _, _, _) => None,
UploadOp::UploadLayer(_, _, _) => None,
UploadOp::UploadMetadata { ref uploaded } => {
// the task id is reused as a monotonicity check for storing the "clean"
// IndexPart.
@@ -2510,7 +2403,7 @@ impl RemoteTimelineClient {
)> {
use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
let res = match op {
UploadOp::UploadLayer(_, m, _, _) => (
UploadOp::UploadLayer(_, m, _) => (
RemoteOpFileKind::Layer,
RemoteOpKind::Upload,
RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size),
@@ -2894,10 +2787,6 @@ mod tests {
for entry in std::fs::read_dir(remote_path).unwrap().flatten() {
let entry_name = entry.file_name();
let fname = entry_name.to_str().unwrap();
if fname.ends_with(".metadata") || fname.ends_with(".enc") {
// ignore metadata and encryption key files; should use local_fs APIs instead in the future
continue;
}
found.push(String::from(fname));
}
found.sort();
@@ -2951,8 +2840,6 @@ mod tests {
)),
config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(&location_conf)),
cancel: CancellationToken::new(),
kms_impl: None,
key_repo: std::sync::Mutex::new(HashMap::new()),
})
}

View File

@@ -23,7 +23,7 @@ use utils::crashsafe::path_with_suffix_extension;
use utils::id::{TenantId, TimelineId};
use utils::{backoff, pausable_failpoint};
use super::index::{EncryptionKeyPair, IndexPart, LayerFileMetadata};
use super::index::{IndexPart, LayerFileMetadata};
use super::manifest::TenantManifest;
use super::{
FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, INITDB_PATH, parse_remote_index_path,
@@ -51,7 +51,6 @@ use crate::virtual_file::{MaybeFatalIo, VirtualFile, on_fatal_io_error};
pub async fn download_layer_file<'a>(
conf: &'static PageServerConf,
storage: &'a GenericRemoteStorage,
key_pair: Option<&'a EncryptionKeyPair>,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
layer_file_name: &'a LayerName,
@@ -87,16 +86,7 @@ pub async fn download_layer_file<'a>(
let bytes_amount = download_retry(
|| async {
download_object(
storage,
key_pair,
&remote_path,
&temp_file_path,
gate,
cancel,
ctx,
)
.await
download_object(storage, &remote_path, &temp_file_path, gate, cancel, ctx).await
},
&format!("download {remote_path:?}"),
cancel,
@@ -155,7 +145,6 @@ pub async fn download_layer_file<'a>(
/// The unlinking has _not_ been made durable.
async fn download_object(
storage: &GenericRemoteStorage,
encryption_key_pair: Option<&EncryptionKeyPair>,
src_path: &RemotePath,
dst_path: &Utf8PathBuf,
#[cfg_attr(target_os = "macos", allow(unused_variables))] gate: &utils::sync::gate::Gate,
@@ -171,12 +160,9 @@ async fn download_object(
.with_context(|| format!("create a destination file for layer '{dst_path}'"))
.map_err(DownloadError::Other)?;
let mut opts = DownloadOpts::default();
if let Some(encryption_key_pair) = encryption_key_pair {
opts.encryption_key = Some(encryption_key_pair.plain_key.to_vec());
}
let download = storage.download(src_path, &opts, cancel).await?;
let download = storage
.download(src_path, &DownloadOpts::default(), cancel)
.await?;
pausable_failpoint!("before-downloading-layer-stream-pausable");

View File

@@ -10,8 +10,6 @@ use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::RelSizeMigration;
use pageserver_api::shard::ShardIndex;
use serde::{Deserialize, Serialize};
use serde_with::base64::Base64;
use serde_with::serde_as;
use utils::id::TimelineId;
use utils::lsn::Lsn;
@@ -116,70 +114,6 @@ pub struct IndexPart {
/// The timestamp when the timeline was marked invisible in synthetic size calculations.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub(crate) marked_invisible_at: Option<NaiveDateTime>,
/// The encryption key used to encrypt the timeline layer files.
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub(crate) keys: Vec<EncryptionKey>,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Ord, PartialOrd, Hash)]
pub struct KeyVersion(pub u32);
impl KeyVersion {
pub fn next(&self) -> Self {
Self(self.0 + 1)
}
}
/// An identifier for an encryption key. The scope of the key is the timeline (TBD).
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Ord, PartialOrd, Hash)]
pub struct EncryptionKeyId {
pub version: KeyVersion,
pub generation: Generation,
}
#[derive(Clone)]
pub struct EncryptionKeyPair {
pub id: EncryptionKeyId,
pub plain_key: Vec<u8>,
pub wrapped_key: Vec<u8>,
}
impl EncryptionKeyPair {
pub fn new(id: EncryptionKeyId, plain_key: Vec<u8>, wrapped_key: Vec<u8>) -> Self {
Self {
id,
plain_key,
wrapped_key,
}
}
}
impl std::fmt::Debug for EncryptionKeyPair {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let display =
base64::display::Base64Display::with_config(&self.wrapped_key, base64::STANDARD);
struct DisplayAsDebug<T: std::fmt::Display>(T);
impl<T: std::fmt::Display> std::fmt::Debug for DisplayAsDebug<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
f.debug_struct("EncryptionKeyPair")
.field("id", &self.id)
.field("plain_key", &"<REDACTED>")
.field("wrapped_key", &DisplayAsDebug(&display))
.finish()
}
}
#[serde_as]
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct EncryptionKey {
#[serde_as(as = "Base64")]
pub key: Vec<u8>,
pub id: EncryptionKeyId,
pub created_at: NaiveDateTime,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
@@ -208,12 +142,10 @@ impl IndexPart {
/// - 12: +l2_lsn
/// - 13: +gc_compaction
/// - 14: +marked_invisible_at
/// - 15: +keys and encryption_key in layer_metadata
const LATEST_VERSION: usize = 15;
const LATEST_VERSION: usize = 14;
// Versions we may see when reading from a bucket.
pub const KNOWN_VERSIONS: &'static [usize] =
&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14];
pub const FILE_NAME: &'static str = "index_part.json";
@@ -233,7 +165,6 @@ impl IndexPart {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
}
}
@@ -274,16 +205,14 @@ impl IndexPart {
/// Check for invariants in the index: this is useful when uploading an index to ensure that if
/// we encounter a bug, we do not persist buggy metadata.
pub(crate) fn validate(&self) -> Result<(), String> {
// We have to disable this check: we might need to upload an empty index part with new keys, or new `reldirv2` flag.
// if self.import_pgdata.is_none()
// && self.metadata.ancestor_timeline().is_none()
// && self.layer_metadata.is_empty()
// {
// // Unless we're in the middle of a raw pgdata import, or this is a child timeline,the index must
// // always have at least one layer.
// return Err("Index has no ancestor and no layers".to_string());
// }
if self.import_pgdata.is_none()
&& self.metadata.ancestor_timeline().is_none()
&& self.layer_metadata.is_empty()
{
// Unless we're in the middle of a raw pgdata import, or this is a child timeline,the index must
// always have at least one layer.
return Err("Index has no ancestor and no layers".to_string());
}
Ok(())
}
@@ -293,7 +222,7 @@ impl IndexPart {
///
/// Fields have to be `Option`s because remote [`IndexPart`]'s can be from different version, which
/// might have less or more metadata depending if upgrading or rolling back an upgrade.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct LayerFileMetadata {
pub file_size: u64,
@@ -304,9 +233,6 @@ pub struct LayerFileMetadata {
#[serde(default = "ShardIndex::unsharded")]
#[serde(skip_serializing_if = "ShardIndex::is_unsharded")]
pub shard: ShardIndex,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub encryption_key: Option<EncryptionKeyId>,
}
impl LayerFileMetadata {
@@ -315,7 +241,6 @@ impl LayerFileMetadata {
file_size,
generation,
shard,
encryption_key: None,
}
}
/// Helper to get both generation and file size in a tuple
@@ -528,16 +453,14 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -552,7 +475,6 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -580,16 +502,14 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -604,7 +524,6 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -633,16 +552,14 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -657,7 +574,6 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -711,7 +627,6 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let empty_layers_parsed = IndexPart::from_json_bytes(empty_layers_json.as_bytes()).unwrap();
@@ -738,16 +653,14 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -762,7 +675,6 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -791,13 +703,11 @@ mod tests {
file_size: 23289856,
generation: Generation::new(1),
shard: ShardIndex::unsharded(),
encryption_key: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014EF499-00000000015A7619".parse().unwrap(), LayerFileMetadata {
file_size: 1015808,
generation: Generation::new(1),
shard: ShardIndex::unsharded(),
encryption_key: None,
})
]),
disk_consistent_lsn: Lsn::from_str("0/15A7618").unwrap(),
@@ -816,7 +726,6 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -847,16 +756,14 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -875,7 +782,6 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -909,14 +815,12 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -939,7 +843,6 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -974,14 +877,12 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -1004,7 +905,6 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1041,14 +941,12 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -1074,7 +972,6 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1120,14 +1017,12 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -1157,7 +1052,6 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1204,14 +1098,12 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -1241,7 +1133,6 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1292,14 +1183,12 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -1331,7 +1220,6 @@ mod tests {
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
}),
marked_invisible_at: None,
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1383,14 +1271,12 @@ mod tests {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: None,
shard: ShardIndex::unsharded()
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -1422,139 +1308,6 @@ mod tests {
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
}),
marked_invisible_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
keys: Vec::new(),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
#[test]
fn v15_keys_are_parsed() {
let example = r#"{
"version": 15,
"layer_metadata":{
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000, "encryption_key": { "version": 1, "generation": 5 } },
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001, "encryption_key": { "version": 2, "generation": 6 } }
},
"disk_consistent_lsn":"0/16960E8",
"metadata": {
"disk_consistent_lsn": "0/16960E8",
"prev_record_lsn": "0/1696070",
"ancestor_timeline": "e45a7f37d3ee2ff17dc14bf4f4e3f52e",
"ancestor_lsn": "0/0",
"latest_gc_cutoff_lsn": "0/1696070",
"initdb_lsn": "0/1696070",
"pg_version": 14
},
"gc_blocking": {
"started_at": "2024-07-19T09:00:00.123",
"reasons": ["DetachAncestor"]
},
"import_pgdata": {
"V1": {
"Done": {
"idempotency_key": "specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5",
"started_at": "2024-11-13T09:23:42.123",
"finished_at": "2024-11-13T09:42:23.123"
}
}
},
"rel_size_migration": "legacy",
"l2_lsn": "0/16960E8",
"gc_compaction": {
"last_completed_lsn": "0/16960E8"
},
"marked_invisible_at": "2023-07-31T09:00:00.123",
"keys": [
{
"key": "dGVzdF9rZXk=",
"id": {
"version": 1,
"generation": 5
},
"created_at": "2024-07-19T09:00:00.123"
},
{
"key": "dGVzdF9rZXlfMg==",
"id": {
"version": 2,
"generation": 6
},
"created_at": "2024-07-19T10:00:00.123"
}
]
}"#;
let expected = IndexPart {
version: 15,
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: Some(EncryptionKeyId {
version: KeyVersion(1),
generation: Generation::Valid(5),
}),
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded(),
encryption_key: Some(EncryptionKeyId {
version: KeyVersion(2),
generation: Generation::Valid(6),
}),
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata: TimelineMetadata::new(
Lsn::from_str("0/16960E8").unwrap(),
Some(Lsn::from_str("0/1696070").unwrap()),
Some(TimelineId::from_str("e45a7f37d3ee2ff17dc14bf4f4e3f52e").unwrap()),
Lsn::INVALID,
Lsn::from_str("0/1696070").unwrap(),
Lsn::from_str("0/1696070").unwrap(),
14,
).with_recalculated_checksum().unwrap(),
deleted_at: None,
lineage: Default::default(),
gc_blocking: Some(GcBlocking {
started_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"),
reasons: enumset::EnumSet::from_iter([GcBlockingReason::DetachAncestor]),
}),
last_aux_file_policy: Default::default(),
archived_at: None,
import_pgdata: Some(import_pgdata::index_part_format::Root::V1(import_pgdata::index_part_format::V1::Done(import_pgdata::index_part_format::Done{
started_at: parse_naive_datetime("2024-11-13T09:23:42.123000000"),
finished_at: parse_naive_datetime("2024-11-13T09:42:23.123000000"),
idempotency_key: import_pgdata::index_part_format::IdempotencyKey::new("specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5".to_string()),
}))),
rel_size_migration: Some(RelSizeMigration::Legacy),
l2_lsn: Some("0/16960E8".parse::<Lsn>().unwrap()),
gc_compaction: Some(GcCompactionState {
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
}),
marked_invisible_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
keys: vec![
EncryptionKey {
key: "test_key".as_bytes().to_vec(),
id: EncryptionKeyId {
version: KeyVersion(1),
generation: Generation::Valid(5),
},
created_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"),
},
EncryptionKey {
key: "test_key_2".as_bytes().to_vec(),
id: EncryptionKeyId {
version: KeyVersion(2),
generation: Generation::Valid(6),
},
created_at: parse_naive_datetime("2024-07-19T10:00:00.123000000"),
}
],
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();

View File

@@ -17,7 +17,7 @@ use utils::id::{TenantId, TimelineId};
use utils::{backoff, pausable_failpoint};
use super::Generation;
use super::index::{EncryptionKeyPair, IndexPart};
use super::index::IndexPart;
use super::manifest::TenantManifest;
use crate::tenant::remote_timeline_client::{
remote_index_path, remote_initdb_archive_path, remote_initdb_preserved_archive_path,
@@ -101,7 +101,6 @@ pub(super) async fn upload_timeline_layer<'a>(
local_path: &'a Utf8Path,
remote_path: &'a RemotePath,
metadata_size: u64,
encryption_key_pair: Option<EncryptionKeyPair>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
fail_point!("before-upload-layer", |_| {
@@ -145,14 +144,7 @@ pub(super) async fn upload_timeline_layer<'a>(
let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);
storage
.upload_with_encryption(
reader,
fs_size,
remote_path,
None,
encryption_key_pair.as_ref().map(|k| k.plain_key.as_slice()),
cancel,
)
.upload(reader, fs_size, remote_path, None, cancel)
.await
.with_context(|| format!("upload layer from local path '{local_path}'"))
}

View File

@@ -1310,7 +1310,6 @@ impl<'a> TenantDownloader<'a> {
let downloaded_bytes = download_layer_file(
self.conf,
self.remote_storage,
None, // TODO: add encryption key pair
*tenant_shard_id,
*timeline_id,
&layer.name,

View File

@@ -1285,6 +1285,10 @@ impl Timeline {
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
if query.is_empty() {
return Ok(BTreeMap::default());
}
let read_path = if self.conf.enable_read_path_debugging || ctx.read_path_debug() {
Some(ReadPath::new(
query.total_keyspace(),
@@ -4864,7 +4868,6 @@ impl Timeline {
else {
panic!("delta layer cannot be empty if no filter is applied");
};
(
// FIXME: even though we have a single image and single delta layer assumption
// we push them to vec
@@ -6933,7 +6936,9 @@ impl Timeline {
}
// Update remote_timeline_client state to reflect existence of this layer
self.remote_client.schedule_layer_file_upload(image_layer)?;
self.remote_client
.schedule_layer_file_upload(image_layer)
.unwrap();
Ok(())
}
@@ -6994,7 +6999,9 @@ impl Timeline {
}
// Update remote_timeline_client state to reflect existence of this layer
self.remote_client.schedule_layer_file_upload(delta_layer)?;
self.remote_client
.schedule_layer_file_upload(delta_layer)
.unwrap();
Ok(())
}

View File

@@ -7,7 +7,7 @@
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
use std::ops::{Deref, Range};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use std::time::{Duration, Instant};
use super::layer_manager::LayerManager;
use super::{
@@ -119,25 +119,32 @@ pub struct GcCompactionMetaStatistics {
/// The layer size after compaction.
pub after_compaction_layer_size: u64,
/// The start time of the meta job.
pub start_time: Option<SystemTime>,
pub start_time: Option<chrono::DateTime<chrono::Utc>>,
/// The end time of the meta job.
pub end_time: Option<SystemTime>,
pub end_time: Option<chrono::DateTime<chrono::Utc>>,
/// The duration of the meta job.
pub duration_secs: f64,
/// The id of the meta job.
pub meta_job_id: GcCompactionJobId,
/// The LSN below which the layers are compacted, used to compute the statistics.
pub below_lsn: Lsn,
/// The retention ratio of the meta job (after_compaction_layer_size / before_compaction_layer_size)
pub retention_ratio: f64,
}
impl GcCompactionMetaStatistics {
fn finalize(&mut self) {
let end_time = SystemTime::now();
let end_time = chrono::Utc::now();
if let Some(start_time) = self.start_time {
if let Ok(duration) = end_time.duration_since(start_time) {
self.duration_secs = duration.as_secs_f64();
if end_time > start_time {
let delta = end_time - start_time;
if let Ok(std_dur) = delta.to_std() {
self.duration_secs = std_dur.as_secs_f64();
}
}
}
self.retention_ratio = self.after_compaction_layer_size as f64
/ (self.before_compaction_layer_size as f64 + 1.0);
self.end_time = Some(end_time);
}
}
@@ -520,7 +527,7 @@ impl GcCompactionQueue {
}
guard.meta_statistics = Some(GcCompactionMetaStatistics {
meta_job_id: id,
start_time: Some(SystemTime::now()),
start_time: Some(chrono::Utc::now()),
before_compaction_layer_size: layer_size,
below_lsn: expected_l2_lsn,
total_sub_compaction_jobs: jobs_len,
@@ -1291,7 +1298,6 @@ impl Timeline {
.parts
.extend(sparse_partitioning.into_dense().parts);
// 3. Create new image layers for partitions that have been modified "enough".
let (image_layers, outcome) = self
.create_image_layers(

View File

@@ -244,8 +244,7 @@ impl RemoteStorageWrapper {
kind: DownloadKind::Large,
etag: None,
byte_start: Bound::Included(start_inclusive),
byte_end: Bound::Excluded(end_exclusive),
encryption_key: None,
byte_end: Bound::Excluded(end_exclusive)
},
&self.cancel)
.await?;

View File

@@ -9,7 +9,6 @@ use tracing::info;
use utils::generation::Generation;
use utils::lsn::{AtomicLsn, Lsn};
use super::remote_timeline_client::index::EncryptionKeyPair;
use super::remote_timeline_client::is_same_remote_layer_path;
use super::storage_layer::{AsLayerDesc as _, LayerName, ResidentLayer};
use crate::tenant::metadata::TimelineMetadata;
@@ -246,7 +245,7 @@ impl UploadQueueInitialized {
pub(crate) fn num_inprogress_layer_uploads(&self) -> usize {
self.inprogress_tasks
.iter()
.filter(|(_, t)| matches!(t.op, UploadOp::UploadLayer(_, _, _, _)))
.filter(|(_, t)| matches!(t.op, UploadOp::UploadLayer(_, _, _)))
.count()
}
@@ -462,12 +461,7 @@ pub struct Delete {
#[derive(Clone, Debug)]
pub enum UploadOp {
/// Upload a layer file. The last field indicates the last operation for thie file.
UploadLayer(
ResidentLayer,
LayerFileMetadata,
Option<EncryptionKeyPair>,
Option<OpType>,
),
UploadLayer(ResidentLayer, LayerFileMetadata, Option<OpType>),
/// Upload a index_part.json file
UploadMetadata {
@@ -489,7 +483,7 @@ pub enum UploadOp {
impl std::fmt::Display for UploadOp {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
UploadOp::UploadLayer(layer, metadata, _, mode) => {
UploadOp::UploadLayer(layer, metadata, mode) => {
write!(
f,
"UploadLayer({}, size={:?}, gen={:?}, mode={:?})",
@@ -523,13 +517,13 @@ impl UploadOp {
(UploadOp::Shutdown, _) | (_, UploadOp::Shutdown) => false,
// Uploads and deletes can bypass each other unless they're for the same file.
(UploadOp::UploadLayer(a, ameta, _, _), UploadOp::UploadLayer(b, bmeta, _, _)) => {
(UploadOp::UploadLayer(a, ameta, _), UploadOp::UploadLayer(b, bmeta, _)) => {
let aname = &a.layer_desc().layer_name();
let bname = &b.layer_desc().layer_name();
!is_same_remote_layer_path(aname, ameta, bname, bmeta)
}
(UploadOp::UploadLayer(u, umeta, _, _), UploadOp::Delete(d))
| (UploadOp::Delete(d), UploadOp::UploadLayer(u, umeta, _, _)) => {
(UploadOp::UploadLayer(u, umeta, _), UploadOp::Delete(d))
| (UploadOp::Delete(d), UploadOp::UploadLayer(u, umeta, _)) => {
d.layers.iter().all(|(dname, dmeta)| {
!is_same_remote_layer_path(&u.layer_desc().layer_name(), umeta, dname, dmeta)
})
@@ -545,8 +539,8 @@ impl UploadOp {
// Similarly, index uploads can bypass uploads and deletes as long as neither the
// uploaded index nor the active index references the file (the latter would be
// incorrect use by the caller).
(UploadOp::UploadLayer(u, umeta, _, _), UploadOp::UploadMetadata { uploaded: i })
| (UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, umeta, _, _)) => {
(UploadOp::UploadLayer(u, umeta, _), UploadOp::UploadMetadata { uploaded: i })
| (UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, umeta, _)) => {
let uname = u.layer_desc().layer_name();
!i.references(&uname, umeta) && !index.references(&uname, umeta)
}
@@ -583,7 +577,7 @@ mod tests {
fn assert_same_op(a: &UploadOp, b: &UploadOp) {
use UploadOp::*;
match (a, b) {
(UploadLayer(a, ameta, _, atype), UploadLayer(b, bmeta, _, btype)) => {
(UploadLayer(a, ameta, atype), UploadLayer(b, bmeta, btype)) => {
assert_eq!(a.layer_desc().layer_name(), b.layer_desc().layer_name());
assert_eq!(ameta, bmeta);
assert_eq!(atype, btype);
@@ -647,7 +641,6 @@ mod tests {
generation: timeline.generation,
shard: timeline.get_shard_index(),
file_size: size as u64,
encryption_key: None,
};
make_layer_with_metadata(timeline, name, metadata)
}
@@ -717,7 +710,7 @@ mod tests {
// Enqueue non-conflicting upload, delete, and index before and after a barrier.
let ops = [
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
UploadOp::Delete(Delete {
layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
}),
@@ -725,7 +718,7 @@ mod tests {
uploaded: index.clone(),
},
UploadOp::Barrier(barrier),
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
UploadOp::Delete(Delete {
layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
}),
@@ -850,9 +843,9 @@ mod tests {
);
let ops = [
UploadOp::UploadLayer(layer0a.clone(), layer0a.metadata(), None, None),
UploadOp::UploadLayer(layer0b.clone(), layer0b.metadata(), None, None),
UploadOp::UploadLayer(layer0c.clone(), layer0c.metadata(), None, None),
UploadOp::UploadLayer(layer0a.clone(), layer0a.metadata(), None),
UploadOp::UploadLayer(layer0b.clone(), layer0b.metadata(), None),
UploadOp::UploadLayer(layer0c.clone(), layer0c.metadata(), None),
];
queue.queued_operations.extend(ops.clone());
@@ -889,14 +882,14 @@ mod tests {
);
let ops = [
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
UploadOp::Delete(Delete {
layers: vec![
(layer0.layer_desc().layer_name(), layer0.metadata()),
(layer1.layer_desc().layer_name(), layer1.metadata()),
],
}),
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
];
queue.queued_operations.extend(ops.clone());
@@ -945,15 +938,15 @@ mod tests {
);
let ops = [
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
UploadOp::Delete(Delete {
layers: vec![
(layer0.layer_desc().layer_name(), layer0.metadata()),
(layer1.layer_desc().layer_name(), layer1.metadata()),
],
}),
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
UploadOp::Delete(Delete {
layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
}),
@@ -991,9 +984,9 @@ mod tests {
);
let ops = [
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
];
queue.queued_operations.extend(ops.clone());
@@ -1068,15 +1061,15 @@ mod tests {
let index2 = index_with(&index1, &layer2);
let ops = [
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
UploadOp::UploadMetadata {
uploaded: index0.clone(),
},
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
UploadOp::UploadMetadata {
uploaded: index1.clone(),
},
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
UploadOp::UploadMetadata {
uploaded: index2.clone(),
},
@@ -1135,7 +1128,7 @@ mod tests {
let ops = [
// Initial upload, with a barrier to prevent index coalescing.
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None, None),
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
UploadOp::UploadMetadata {
uploaded: index_upload.clone(),
},
@@ -1184,7 +1177,7 @@ mod tests {
let ops = [
// Initial upload, with a barrier to prevent index coalescing.
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None, None),
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
UploadOp::UploadMetadata {
uploaded: index_upload.clone(),
},
@@ -1194,7 +1187,7 @@ mod tests {
uploaded: index_deref.clone(),
},
// Replace and reference the layer.
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None, None),
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
UploadOp::UploadMetadata {
uploaded: index_ref.clone(),
},
@@ -1242,7 +1235,7 @@ mod tests {
// Enqueue non-conflicting upload, delete, and index before and after a shutdown.
let ops = [
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
UploadOp::Delete(Delete {
layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
}),
@@ -1250,7 +1243,7 @@ mod tests {
uploaded: index.clone(),
},
UploadOp::Shutdown,
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
UploadOp::Delete(Delete {
layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
}),
@@ -1312,10 +1305,10 @@ mod tests {
);
let ops = [
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
UploadOp::UploadLayer(layer3.clone(), layer3.metadata(), None, None),
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
UploadOp::UploadLayer(layer3.clone(), layer3.metadata(), None),
];
queue.queued_operations.extend(ops.clone());
@@ -1366,7 +1359,7 @@ mod tests {
.layer_metadata
.insert(layer.layer_desc().layer_name(), layer.metadata());
vec![
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None, None),
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
UploadOp::Delete(Delete {
layers: vec![(layer.layer_desc().layer_name(), layer.metadata())],
}),
@@ -1385,7 +1378,6 @@ mod tests {
shard,
generation: Generation::Valid(generation),
file_size: 0,
encryption_key: None,
};
make_layer_with_metadata(&tli, name, metadata)
};

View File

@@ -126,6 +126,7 @@ pub(crate) enum DatabaseOperation {
InsertTimelineReconcile,
RemoveTimelineReconcile,
ListTimelineReconcile,
ListTimelineReconcileStartup,
}
#[must_use]
@@ -1521,23 +1522,41 @@ impl Persistence {
.await
}
/// Load pending operations from db.
pub(crate) async fn list_pending_ops(
/// Load pending operations from db, joined together with timeline data.
pub(crate) async fn list_pending_ops_with_timelines(
&self,
) -> DatabaseResult<Vec<TimelinePendingOpPersistence>> {
) -> DatabaseResult<Vec<(TimelinePendingOpPersistence, Option<TimelinePersistence>)>> {
use crate::schema::safekeeper_timeline_pending_ops::dsl;
use crate::schema::timelines;
let timeline_from_db = self
.with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| {
Box::pin(async move {
let from_db: Vec<TimelinePendingOpPersistence> =
dsl::safekeeper_timeline_pending_ops.load(conn).await?;
Ok(from_db)
})
})
.with_measured_conn(
DatabaseOperation::ListTimelineReconcileStartup,
move |conn| {
Box::pin(async move {
let from_db: Vec<(TimelinePendingOpPersistence, Option<TimelineFromDb>)> =
dsl::safekeeper_timeline_pending_ops
.left_join(
timelines::table.on(timelines::tenant_id
.eq(dsl::tenant_id)
.and(timelines::timeline_id.eq(dsl::timeline_id))),
)
.select((
TimelinePendingOpPersistence::as_select(),
Option::<TimelineFromDb>::as_select(),
))
.load(conn)
.await?;
Ok(from_db)
})
},
)
.await?;
Ok(timeline_from_db)
Ok(timeline_from_db
.into_iter()
.map(|(op, tl_opt)| (op, tl_opt.map(|tl_opt| tl_opt.into_persistence())))
.collect())
}
/// List pending operations for a given timeline (including tenant-global ones)
pub(crate) async fn list_pending_ops_for_timeline(
@@ -1580,7 +1599,7 @@ impl Persistence {
let tenant_id = &tenant_id;
let timeline_id = &timeline_id;
self.with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| {
self.with_measured_conn(DatabaseOperation::RemoveTimelineReconcile, move |conn| {
let timeline_id_str = timeline_id.map(|tid| tid.to_string()).unwrap_or_default();
Box::pin(async move {
diesel::delete(dsl::safekeeper_timeline_pending_ops)

View File

@@ -824,9 +824,13 @@ impl Service {
let mut locked = self.inner.write().unwrap();
locked.become_leader();
for (sk_id, _sk) in locked.safekeepers.clone().iter() {
locked.safekeeper_reconcilers.start_reconciler(*sk_id, self);
}
locked
.safekeeper_reconcilers
.schedule_request_vec(self, sk_schedule_requests);
.schedule_request_vec(sk_schedule_requests);
}
// TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that

View File

@@ -30,31 +30,35 @@ impl SafekeeperReconcilers {
reconcilers: HashMap::new(),
}
}
pub(crate) fn schedule_request_vec(
&mut self,
service: &Arc<Service>,
reqs: Vec<ScheduleRequest>,
) {
/// Adds a safekeeper-specific reconciler.
/// Can be called multiple times, but it needs to be called at least once
/// for every new safekeeper added.
pub(crate) fn start_reconciler(&mut self, node_id: NodeId, service: &Arc<Service>) {
self.reconcilers.entry(node_id).or_insert_with(|| {
SafekeeperReconciler::spawn(self.cancel.child_token(), service.clone())
});
}
/// Stop a safekeeper-specific reconciler.
/// Stops the reconciler, cancelling all ongoing tasks.
pub(crate) fn stop_reconciler(&mut self, node_id: NodeId) {
if let Some(handle) = self.reconcilers.remove(&node_id) {
handle.cancel.cancel();
}
}
pub(crate) fn schedule_request_vec(&self, reqs: Vec<ScheduleRequest>) {
tracing::info!(
"Scheduling {} pending safekeeper ops loaded from db",
reqs.len()
);
for req in reqs {
self.schedule_request(service, req);
self.schedule_request(req);
}
}
pub(crate) fn schedule_request(&mut self, service: &Arc<Service>, req: ScheduleRequest) {
pub(crate) fn schedule_request(&self, req: ScheduleRequest) {
let node_id = req.safekeeper.get_id();
let reconciler_handle = self.reconcilers.entry(node_id).or_insert_with(|| {
SafekeeperReconciler::spawn(self.cancel.child_token(), service.clone())
});
let reconciler_handle = self.reconcilers.get(&node_id).unwrap();
reconciler_handle.schedule_reconcile(req);
}
pub(crate) fn cancel_safekeeper(&mut self, node_id: NodeId) {
if let Some(handle) = self.reconcilers.remove(&node_id) {
handle.cancel.cancel();
}
}
/// Cancel ongoing reconciles for the given timeline
///
/// Specifying `None` here only removes reconciles for the tenant-global reconciliation,
@@ -78,9 +82,12 @@ pub(crate) async fn load_schedule_requests(
service: &Arc<Service>,
safekeepers: &HashMap<NodeId, Safekeeper>,
) -> anyhow::Result<Vec<ScheduleRequest>> {
let pending_ops = service.persistence.list_pending_ops().await?;
let mut res = Vec::with_capacity(pending_ops.len());
for op_persist in pending_ops {
let pending_ops_timelines = service
.persistence
.list_pending_ops_with_timelines()
.await?;
let mut res = Vec::with_capacity(pending_ops_timelines.len());
for (op_persist, timeline_persist) in pending_ops_timelines {
let node_id = NodeId(op_persist.sk_id as u64);
let Some(sk) = safekeepers.get(&node_id) else {
// This shouldn't happen, at least the safekeeper should exist as decomissioned.
@@ -102,16 +109,12 @@ pub(crate) async fn load_schedule_requests(
SafekeeperTimelineOpKind::Delete => Vec::new(),
SafekeeperTimelineOpKind::Exclude => Vec::new(),
SafekeeperTimelineOpKind::Pull => {
// TODO this code is super hacky, it doesn't take migrations into account
let Some(timeline_id) = timeline_id else {
if timeline_id.is_none() {
// We only do this extra check (outside of timeline_persist check) to give better error msgs
anyhow::bail!(
"timeline_id is empty for `pull` schedule request for {tenant_id}"
);
};
let timeline_persist = service
.persistence
.get_timeline(tenant_id, timeline_id)
.await?;
let Some(timeline_persist) = timeline_persist else {
// This shouldn't happen, the timeline should still exist
tracing::warn!(
@@ -163,6 +166,7 @@ pub(crate) struct ScheduleRequest {
pub(crate) kind: SafekeeperTimelineOpKind,
}
/// Handle to per safekeeper reconciler.
struct ReconcilerHandle {
tx: UnboundedSender<(ScheduleRequest, CancellationToken)>,
ongoing_tokens: Arc<ClashMap<(TenantId, Option<TimelineId>), CancellationToken>>,
@@ -170,7 +174,10 @@ struct ReconcilerHandle {
}
impl ReconcilerHandle {
/// Obtain a new token slot, cancelling any existing reconciliations for that timeline
/// Obtain a new token slot, cancelling any existing reconciliations for
/// that timeline. It is not useful to have >1 operation per <tenant_id,
/// timeline_id, safekeeper>, hence scheduling op cancels current one if it
/// exists.
fn new_token_slot(
&self,
tenant_id: TenantId,
@@ -305,15 +312,16 @@ impl SafekeeperReconciler {
SafekeeperTimelineOpKind::Delete => {
let tenant_id = req.tenant_id;
if let Some(timeline_id) = req.timeline_id {
let deleted = self.reconcile_inner(
req,
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|_resp| {
tracing::info!(%tenant_id, %timeline_id, "deleted timeline from {req_host}");
},
req_cancel,
)
.await;
let deleted = self
.reconcile_inner(
req,
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|_resp| {
tracing::info!("deleted timeline from {req_host}");
},
req_cancel,
)
.await;
if deleted {
self.delete_timeline_from_db(tenant_id, timeline_id).await;
}
@@ -344,12 +352,13 @@ impl SafekeeperReconciler {
{
Ok(list) => {
if !list.is_empty() {
tracing::info!(%tenant_id, %timeline_id, "not deleting timeline from db as there is {} open reconciles", list.len());
// duplicate the timeline_id here because it might be None in the reconcile context
tracing::info!(%timeline_id, "not deleting timeline from db as there is {} open reconciles", list.len());
return;
}
}
Err(e) => {
tracing::warn!(%tenant_id, %timeline_id, "couldn't query pending ops: {e}");
tracing::warn!(%timeline_id, "couldn't query pending ops: {e}");
return;
}
}

View File

@@ -46,6 +46,7 @@ impl Service {
.map(SecretString::from);
let mut joinset = JoinSet::new();
// Prepare membership::Configuration from choosen safekeepers.
let safekeepers = {
let locked = self.inner.read().unwrap();
locked.safekeepers.clone()
@@ -205,7 +206,7 @@ impl Service {
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
start_lsn: start_lsn.into(),
generation: 0,
generation: 1,
sk_set: sks_persistence.clone(),
new_sk_set: None,
cplane_notified_generation: 0,
@@ -254,7 +255,7 @@ impl Service {
self.persistence.insert_pending_op(pending_op).await?;
}
if !remaining.is_empty() {
let mut locked = self.inner.write().unwrap();
let locked = self.inner.read().unwrap();
for remaining_id in remaining {
let Some(sk) = locked.safekeepers.get(&remaining_id) else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
@@ -290,7 +291,7 @@ impl Service {
generation: timeline_persist.generation as u32,
kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
};
locked.safekeeper_reconcilers.schedule_request(self, req);
locked.safekeeper_reconcilers.schedule_request(req);
}
}
@@ -357,7 +358,7 @@ impl Service {
let pending_op = TimelinePendingOpPersistence {
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
generation: tl.generation,
generation: i32::MAX,
op_kind: SafekeeperTimelineOpKind::Delete,
sk_id: *sk_id,
};
@@ -365,7 +366,7 @@ impl Service {
self.persistence.insert_pending_op(pending_op).await?;
}
{
let mut locked = self.inner.write().unwrap();
let locked = self.inner.read().unwrap();
for sk_id in all_sks {
let sk_id = NodeId(*sk_id as u64);
let Some(sk) = locked.safekeepers.get(&sk_id) else {
@@ -383,7 +384,7 @@ impl Service {
generation: tl.generation as u32,
kind: SafekeeperTimelineOpKind::Delete,
};
locked.safekeeper_reconcilers.schedule_request(self, req);
locked.safekeeper_reconcilers.schedule_request(req);
}
}
Ok(())
@@ -482,7 +483,7 @@ impl Service {
tenant_id,
timeline_id: None,
};
locked.safekeeper_reconcilers.schedule_request(self, req);
locked.safekeeper_reconcilers.schedule_request(req);
}
Ok(())
}
@@ -579,7 +580,7 @@ impl Service {
}
pub(crate) async fn upsert_safekeeper(
&self,
self: &Arc<Service>,
record: crate::persistence::SafekeeperUpsert,
) -> Result<(), ApiError> {
let node_id = NodeId(record.id as u64);
@@ -618,6 +619,9 @@ impl Service {
);
}
}
locked
.safekeeper_reconcilers
.start_reconciler(node_id, self);
locked.safekeepers = Arc::new(safekeepers);
metrics::METRICS_REGISTRY
.metrics_group
@@ -638,7 +642,7 @@ impl Service {
}
pub(crate) async fn set_safekeeper_scheduling_policy(
&self,
self: &Arc<Service>,
id: i64,
scheduling_policy: SkSchedulingPolicy,
) -> Result<(), DatabaseError> {
@@ -656,9 +660,13 @@ impl Service {
sk.set_scheduling_policy(scheduling_policy);
match scheduling_policy {
SkSchedulingPolicy::Active => (),
SkSchedulingPolicy::Active => {
locked
.safekeeper_reconcilers
.start_reconciler(node_id, self);
}
SkSchedulingPolicy::Decomissioned | SkSchedulingPolicy::Pause => {
locked.safekeeper_reconcilers.cancel_safekeeper(node_id);
locked.safekeeper_reconcilers.stop_reconciler(node_id);
}
}

View File

@@ -22,19 +22,62 @@ def connection_parameters_to_env(params: dict[str, str]) -> dict[str, str]:
}
# Some API calls not yet implemented.
# You may want to copy not-yet-implemented methods from the PR https://github.com/neondatabase/neon/pull/11305
class NeonAPI:
def __init__(self, neon_api_key: str, neon_api_base_url: str):
self.__neon_api_key = neon_api_key
self.__neon_api_base_url = neon_api_base_url.strip("/")
self.retry_if_possible = False
self.attempts = 10
self.sleep_before_retry = 1
self.retries524 = 0
self.retries4xx = 0
def __request(self, method: str | bytes, endpoint: str, **kwargs: Any) -> requests.Response:
if "headers" not in kwargs:
kwargs["headers"] = {}
kwargs["headers"] = kwargs.get("headers", {})
kwargs["headers"]["Authorization"] = f"Bearer {self.__neon_api_key}"
resp = requests.request(method, f"{self.__neon_api_base_url}{endpoint}", **kwargs)
log.debug("%s %s returned a %d: %s", method, endpoint, resp.status_code, resp.text)
resp.raise_for_status()
for attempt in range(self.attempts):
retry = False
resp = requests.request(method, f"{self.__neon_api_base_url}{endpoint}", **kwargs)
if resp.status_code >= 400:
log.error(
"%s %s returned a %d: %s",
method,
endpoint,
resp.status_code,
resp.text if resp.status_code != 524 else "CloudFlare error page",
)
else:
log.debug("%s %s returned a %d: %s", method, endpoint, resp.status_code, resp.text)
if not self.retry_if_possible:
resp.raise_for_status()
break
elif resp.status_code >= 400:
if resp.status_code == 422:
if resp.json()["message"] == "branch not ready yet":
retry = True
self.retries4xx += 1
elif resp.status_code == 423 and resp.json()["message"] in {
"endpoint is in some transitive state, could not suspend",
"project already has running conflicting operations, scheduling of new ones is prohibited",
}:
retry = True
self.retries4xx += 1
elif resp.status_code == 524:
log.info("The request was timed out, trying to get operations")
retry = True
self.retries524 += 1
if retry:
log.info("Retrying, attempt %s/%s", attempt + 1, self.attempts)
time.sleep(self.sleep_before_retry)
continue
else:
resp.raise_for_status()
break
else:
raise RuntimeError("Max retry count is reached")
return resp
@@ -101,6 +144,96 @@ class NeonAPI:
return cast("dict[str, Any]", resp.json())
def create_branch(
self,
project_id: str,
branch_name: str | None = None,
parent_id: str | None = None,
parent_lsn: str | None = None,
parent_timestamp: str | None = None,
protected: bool | None = None,
archived: bool | None = None,
init_source: str | None = None,
add_endpoint=True,
) -> dict[str, Any]:
data: dict[str, Any] = {}
if add_endpoint:
data["endpoints"] = [{"type": "read_write"}]
data["branch"] = {}
if parent_id:
data["branch"]["parent_id"] = parent_id
if branch_name:
data["branch"]["name"] = branch_name
if parent_lsn is not None:
data["branch"]["parent_lsn"] = parent_lsn
if parent_timestamp is not None:
data["branch"]["parent_timestamp"] = parent_timestamp
if protected is not None:
data["branch"]["protected"] = protected
if init_source is not None:
data["branch"]["init_source"] = init_source
if archived is not None:
data["branch"]["archived"] = archived
if not data["branch"]:
data.pop("branch")
resp = self.__request(
"POST",
f"/projects/{project_id}/branches",
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
json=data,
)
return cast("dict[str, Any]", resp.json())
def get_branch_details(self, project_id: str, branch_id: str) -> dict[str, Any]:
resp = self.__request(
"GET",
f"/projects/{project_id}/branches/{branch_id}",
headers={
"Accept": "application/json",
},
)
return cast("dict[str, Any]", resp.json())
def delete_branch(self, project_id: str, branch_id: str) -> dict[str, Any]:
resp = self.__request(
"DELETE",
f"/projects/{project_id}/branches/{branch_id}",
headers={
"Accept": "application/json",
},
)
return cast("dict[str, Any]", resp.json())
def restore_branch(
self,
project_id: str,
branch_id: str,
source_branch_id: str,
source_lsn: str | None,
source_timestamp: str | None,
preserve_under_name: str | None,
):
data = {"source_branch_id": source_branch_id}
if source_lsn:
data["source_lsn"] = source_lsn
if source_timestamp:
data["source_timestamp"] = source_timestamp
if preserve_under_name:
data["preserve_under_name"] = preserve_under_name
log.info("Data: %s", data)
resp = self.__request(
"POST",
f"/projects/{project_id}/branches/{branch_id}/restore",
headers={
"Accept": "application/json",
},
json=data,
)
return cast("dict[str, Any]", resp.json())
def start_endpoint(
self,
project_id: str,
@@ -176,6 +309,10 @@ class NeonAPI:
return cast("dict[str, Any]", resp.json())
def delete_endpoint(self, project_id: str, endpoint_id: str) -> dict[str, Any]:
resp = self.__request("DELETE", f"/projects/{project_id}/endpoints/{endpoint_id}")
return cast("dict[str,Any]", resp.json())
def get_connection_uri(
self,
project_id: str,

View File

@@ -3185,6 +3185,7 @@ class PgBin:
command: list[str],
env: Env | None = None,
cwd: str | Path | None = None,
stderr_pipe: Any | None = None,
) -> subprocess.Popen[Any]:
"""
Run one of the postgres binaries, not waiting for it to finish
@@ -3202,7 +3203,9 @@ class PgBin:
log.info(f"Running command '{' '.join(command)}'")
env = self._build_env(env)
self._log_env(env)
return subprocess.Popen(command, env=env, cwd=cwd, stdout=subprocess.PIPE, text=True)
return subprocess.Popen(
command, env=env, cwd=cwd, stdout=subprocess.PIPE, stderr=stderr_pipe, text=True
)
def run(
self,

View File

@@ -0,0 +1,93 @@
# Random Operations Test for Neon Stability
## Problem Statement
Neon needs robust testing of Neon's stability to ensure reliability for users. The random operations test addresses this by continuously exercising the API with unpredictable sequences of operations, helping to identify edge cases and potential issues that might not be caught by deterministic tests.
### Key Components
#### 1. Class Structure
The test implements three main classes to model the Neon architecture:
- **NeonProject**: Represents a Neon project and manages the lifecycle of branches and endpoints
- **NeonBranch**: Represents a branch within a project, with methods for creating child branches, endpoints, and performing point-in-time restores
- **NeonEndpoint**: Represents an endpoint (connection point) for a branch, with methods for managing benchmarks
#### 2. Operations Tested
The test randomly performs the following operations with weighted probabilities:
- **Creating branches**
- **Deleting branches**
- **Adding read-only endpoints**
- **Deleting read-only endpoints**
- **Restoring branches to random points in time**
#### 3. Load Generation
Each branch and endpoint is loaded with `pgbench` to simulate real database workloads during testing. This ensures that the operations are performed against branches with actual data and ongoing transactions.
#### 4. Error Handling
The test includes robust error handling for various scenarios:
- Branch limit exceeded
- Connection timeouts
- Control plane timeouts (HTTP 524 errors)
- Benchmark failures
#### 5. CI Integration
The test is integrated into the CI pipeline via a GitHub workflow that runs daily, ensuring continuous validation of API stability.
## How It Works
1. The test creates a Neon project using the Public API
2. It initializes the main branch with pgbench data
3. It performs random operations according to the weighted probabilities
4. During each operation, it checks that all running benchmarks are still operational
5. The test cleans up by deleting the project at the end
## Configuration
The test can be configured with:
- `RANDOM_SEED`: Set a specific random seed for reproducible test runs
- `NEON_API_KEY`: API key for authentication
- `NEON_API_BASE_URL`: Base URL for the API (defaults to staging environment)
- `NUM_OPERATIONS`: The number of operations to be performed
## Running the Test
The test is designed to run in the CI environment but can also be executed locally:
```bash
NEON_API_KEY=your_api_key ./scripts/pytest test_runner/random_ops/test_random_ops.py -m remote_cluster
```
To run with a specific random seed for reproducibility:
```bash
RANDOM_SEED=12345 NEON_API_KEY=your_api_key ./scripts/pytest test_runner/random_ops/test_random_ops.py -m remote_cluster
```
To run with the custom number of operations:
```bash
NUM_OPERATIONS=500 NEON_API_KEY=your_api_key ./scripts/pytest test_runner/random_ops/test_random_ops.py -m remote_cluster
```
## Benefits
This test provides several key benefits:
1. **Comprehensive API testing**: Exercises multiple API endpoints in combination
2. **Edge case discovery**: Random sequences may uncover issues not found in deterministic tests
3. **Stability validation**: Continuous execution helps ensure long-term API reliability
4. **Regression prevention**: Detects if new changes break existing API functionality
## Future Improvements
Potential enhancements to the test could include:
1. Adding more API operations, e.g. `reset_to_parent`, `snapshot`, etc
2. Implementing more sophisticated load patterns
3. Adding metrics collection to measure API performance
4. Extending test duration for longer-term stability validation

View File

@@ -0,0 +1,463 @@
"""
Run the random API tests on the cloud instance of Neon
"""
from __future__ import annotations
import os
import random
import subprocess
import time
from datetime import UTC, datetime, timedelta
from typing import TYPE_CHECKING, Any
import pytest
from fixtures.log_helper import log
from requests import HTTPError
if TYPE_CHECKING:
from pathlib import Path
from fixtures.neon_api import NeonAPI
from fixtures.neon_fixtures import PgBin
from fixtures.pg_version import PgVersion
class NeonEndpoint:
"""
Neon Endpoint
Gets the output of the API call of an endpoint creation
"""
def __init__(self, project: NeonProject, endpoint: dict[str, Any]):
self.project: NeonProject = project
self.id: str = endpoint["id"]
# The branch endpoint belongs to
self.branch: NeonBranch = project.branches[endpoint["branch_id"]]
self.type: str = endpoint["type"]
# add itself to the list of endpoints of the branch
self.branch.endpoints[self.id] = self
self.project.endpoints[self.id] = self
self.host: str = endpoint["host"]
self.benchmark: subprocess.Popen[Any] | None = None
# The connection environment is used when running benchmark
self.connect_env: dict[str, str] | None = None
if self.branch.connect_env:
self.connect_env = self.branch.connect_env.copy()
self.connect_env["PGHOST"] = self.host
def delete(self):
self.project.delete_endpoint(self.id)
def start_benchmark(self, clients=10):
return self.project.start_benchmark(self.id, clients=clients)
def check_benchmark(self):
self.project.check_benchmark(self.id)
def terminate_benchmark(self):
self.project.terminate_benchmark(self.id)
class NeonBranch:
"""
Neon Branch
Gets the output of the API call of the Neon Public API call of a branch creation as a first parameter
is_reset defines if the branch is a reset one i.e. created as a result of the reset API Call
"""
def __init__(self, project, branch: dict[str, Any], is_reset=False):
self.id: str = branch["branch"]["id"]
self.desc = branch
self.project: NeonProject = project
self.neon_api: NeonAPI = project.neon_api
self.project_id: str = branch["branch"]["project_id"]
self.parent: NeonBranch | None = (
self.project.branches[branch["branch"]["parent_id"]]
if "parent_id" in branch["branch"]
else None
)
if is_reset:
self.project.reset_branches.add(self.id)
elif self.parent:
self.project.leaf_branches[self.id] = self
if self.parent is not None and self.parent.id in self.project.leaf_branches:
self.project.leaf_branches.pop(self.parent.id)
self.project.branches[self.id] = self
self.children: dict[str, NeonBranch] = {}
if self.parent is not None:
self.parent.children[self.id] = self
self.endpoints: dict[str, NeonEndpoint] = {}
self.connection_parameters: dict[str, str] | None = (
branch["connection_uris"][0]["connection_parameters"]
if "connection_uris" in branch
else None
)
self.benchmark: subprocess.Popen[Any] | None = None
self.updated_at: datetime = datetime.fromisoformat(branch["branch"]["updated_at"])
self.connect_env: dict[str, str] | None = None
if self.connection_parameters:
self.connect_env = {
"PGHOST": self.connection_parameters["host"],
"PGUSER": self.connection_parameters["role"],
"PGDATABASE": self.connection_parameters["database"],
"PGPASSWORD": self.connection_parameters["password"],
"PGSSLMODE": "require",
}
def __str__(self):
"""
Prints the branch's name with all the predecessors
(r) means the branch is a reset one
"""
return f"{self.id}{'(r)' if self.id in self.project.reset_branches else ''}, parent: {self.parent}"
def create_child_branch(self) -> NeonBranch | None:
return self.project.create_branch(self.id)
def create_ro_endpoint(self) -> NeonEndpoint:
return NeonEndpoint(
self.project,
self.neon_api.create_endpoint(self.project_id, self.id, "read_only", {})["endpoint"],
)
def delete(self) -> None:
self.project.delete_branch(self.id)
def start_benchmark(self, clients=10) -> subprocess.Popen[Any]:
return self.project.start_benchmark(self.id, clients=clients)
def check_benchmark(self) -> None:
self.project.check_benchmark(self.id)
def terminate_benchmark(self) -> None:
self.project.terminate_benchmark(self.id)
def restore_random_time(self) -> None:
"""
Does PITR, i.e. calls the reset API call on the same branch to the random time in the past
"""
min_time = self.updated_at + timedelta(seconds=1)
max_time = datetime.now(UTC) - timedelta(seconds=1)
target_time = (min_time + (max_time - min_time) * random.random()).replace(microsecond=0)
res = self.restore(
self.id,
source_timestamp=target_time.isoformat().replace("+00:00", "Z"),
preserve_under_name=self.project.gen_restore_name(),
)
if res is None:
return
self.updated_at = datetime.fromisoformat(res["branch"]["updated_at"])
parent_id: str = res["branch"]["parent_id"]
# Creates an object for the parent branch
# After the reset operation a new parent branch is created
parent = NeonBranch(
self.project, self.neon_api.get_branch_details(self.project_id, parent_id), True
)
self.project.branches[parent_id] = parent
self.parent = parent
parent.children[self.id] = self
self.project.wait()
def restore(
self,
source_branch_id: str,
source_lsn: str | None = None,
source_timestamp: str | None = None,
preserve_under_name: str | None = None,
) -> dict[str, Any] | None:
endpoints = [ep for ep in self.endpoints.values() if ep.type == "read_only"]
# Terminate all the benchmarks running to prevent errors. Errors in benchmark during pgbench are expected
for ep in endpoints:
ep.terminate_benchmark()
self.terminate_benchmark()
try:
res: dict[str, Any] = self.neon_api.restore_branch(
self.project_id,
self.id,
source_branch_id,
source_lsn,
source_timestamp,
preserve_under_name,
)
except HTTPError as he:
if (
he.response.status_code == 422
and he.response.json()["code"] == "BRANCHES_LIMIT_EXCEEDED"
):
log.info("Branch limit exceeded, skipping")
return None
else:
raise HTTPError(he) from he
self.project.wait()
self.start_benchmark()
for ep in endpoints:
ep.start_benchmark()
return res
class NeonProject:
"""
The project object
Calls the Public API to create a Neon Project
"""
def __init__(self, neon_api: NeonAPI, pg_bin: PgBin, pg_version: PgVersion):
self.neon_api = neon_api
self.pg_bin = pg_bin
proj = self.neon_api.create_project(
pg_version, f"Automatic random API test {os.getenv('GITHUB_RUN_ID')}"
)
self.id: str = proj["project"]["id"]
self.name: str = proj["project"]["name"]
self.connection_uri: str = proj["connection_uris"][0]["connection_uri"]
self.connection_parameters: dict[str, str] = proj["connection_uris"][0][
"connection_parameters"
]
self.pg_version: PgVersion = pg_version
# Leaf branches are the branches, which do not have children
self.leaf_branches: dict[str, NeonBranch] = {}
self.branches: dict[str, NeonBranch] = {}
self.reset_branches: set[str] = set()
self.main_branch: NeonBranch = NeonBranch(self, proj)
self.main_branch.connection_parameters = self.connection_parameters
self.endpoints: dict[str, NeonEndpoint] = {}
for endpoint in proj["endpoints"]:
NeonEndpoint(self, endpoint)
self.neon_api.wait_for_operation_to_finish(self.id)
self.benchmarks: dict[str, subprocess.Popen[Any]] = {}
self.restore_num: int = 0
self.restart_pgbench_on_console_errors: bool = False
def delete(self):
self.neon_api.delete_project(self.id)
def create_branch(self, parent_id: str | None = None) -> NeonBranch | None:
self.wait()
try:
branch_def = self.neon_api.create_branch(self.id, parent_id=parent_id)
except HTTPError as he:
if (
he.response.status_code == 422
and he.response.json()["code"] == "BRANCHES_LIMIT_EXCEEDED"
):
log.info("Branch limit exceeded, skipping")
return None
else:
raise HTTPError(he) from he
new_branch = NeonBranch(self, branch_def)
self.wait()
return new_branch
def delete_branch(self, branch_id: str) -> None:
parent = self.branches[branch_id].parent
if not parent or branch_id == self.main_branch.id:
raise RuntimeError("Cannot delete the main branch")
if branch_id not in self.leaf_branches and branch_id not in self.reset_branches:
raise RuntimeError(f"The branch {branch_id}, probably, has ancestors")
if branch_id not in self.branches:
raise RuntimeError(f"The branch with id {branch_id} is not found")
endpoints_to_delete = [
ep for ep in self.branches[branch_id].endpoints.values() if ep.type == "read_only"
]
for ep in endpoints_to_delete:
ep.delete()
if branch_id not in self.reset_branches:
self.terminate_benchmark(branch_id)
self.neon_api.delete_branch(self.id, branch_id)
if len(parent.children) == 1 and parent.id != self.main_branch.id:
self.leaf_branches[parent.id] = parent
parent.children.pop(branch_id)
if branch_id in self.leaf_branches:
self.leaf_branches.pop(branch_id)
else:
self.reset_branches.remove(branch_id)
self.branches.pop(branch_id)
self.wait()
if parent.id in self.reset_branches:
parent.delete()
def delete_endpoint(self, endpoint_id: str) -> None:
self.terminate_benchmark(endpoint_id)
self.neon_api.delete_endpoint(self.id, endpoint_id)
self.endpoints[endpoint_id].branch.endpoints.pop(endpoint_id)
self.endpoints.pop(endpoint_id)
self.wait()
def start_benchmark(self, target: str, clients: int = 10) -> subprocess.Popen[Any]:
if target in self.benchmarks:
raise RuntimeError(f"Benchmark was already started for {target}")
is_endpoint = target.startswith("ep")
read_only = is_endpoint and self.endpoints[target].type == "read_only"
cmd = ["pgbench", f"-c{clients}", "-T10800", "-Mprepared"]
if read_only:
cmd.extend(["-S", "-n"])
target_object = self.endpoints[target] if is_endpoint else self.branches[target]
if target_object.connect_env is None:
raise RuntimeError(f"The connection environment is not defined for {target}")
log.info(
"running pgbench on %s, cmd: %s, host: %s",
target,
cmd,
target_object.connect_env["PGHOST"],
)
pgbench = self.pg_bin.run_nonblocking(
cmd, env=target_object.connect_env, stderr_pipe=subprocess.PIPE
)
self.benchmarks[target] = pgbench
target_object.benchmark = pgbench
time.sleep(2)
return pgbench
def check_all_benchmarks(self) -> None:
for target in tuple(self.benchmarks.keys()):
self.check_benchmark(target)
def check_benchmark(self, target) -> None:
rc = self.benchmarks[target].poll()
if rc is not None:
_, err = self.benchmarks[target].communicate()
log.error("STDERR: %s", err)
# if the benchmark failed due to irresponsible Control plane,
# just restart it
if self.restart_pgbench_on_console_errors and (
"ERROR: Couldn't connect to compute node" in err
or "ERROR: Console request failed" in err
):
log.info("Restarting benchmark for %s", target)
self.benchmarks.pop(target)
self.start_benchmark(target)
return
raise RuntimeError(f"The benchmark for {target} ended with code {rc}")
def terminate_benchmark(self, target):
log.info("Terminating the benchmark %s", target)
target_endpoint = target.startswith("ep")
self.check_benchmark(target)
self.benchmarks[target].terminate()
self.benchmarks.pop(target)
if target_endpoint:
self.endpoints[target].benchmark = None
else:
self.branches[target].benchmark = None
def wait(self):
"""
Wait for all the operations to be finished
"""
return self.neon_api.wait_for_operation_to_finish(self.id)
def gen_restore_name(self):
self.restore_num += 1
return f"restore{self.restore_num}"
@pytest.fixture()
def setup_class(
pg_version: PgVersion,
pg_bin: PgBin,
neon_api: NeonAPI,
):
neon_api.retry_if_possible = True
project = NeonProject(neon_api, pg_bin, pg_version)
log.info("Created a project with id %s, name %s", project.id, project.name)
yield pg_bin, project
log.info("Retried 524 errors: %s", neon_api.retries524)
log.info("Retried 4xx errors: %s", neon_api.retries4xx)
if neon_api.retries524 > 0:
print(f"::warning::Retried on 524 error {neon_api.retries524} times")
if neon_api.retries4xx > 0:
print(f"::warning::Retried on 4xx error {neon_api.retries4xx} times")
log.info("Removing the project")
project.delete()
def do_action(project: NeonProject, action: str) -> None:
"""
Runs the action
"""
log.info("Action: %s", action)
if action == "new_branch":
log.info("Trying to create a new branch")
parent = project.branches[
random.choice(list(set(project.branches.keys()) - project.reset_branches))
]
log.info("Parent: %s", parent)
child = parent.create_child_branch()
if child is None:
return
log.info("Created branch %s", child)
child.start_benchmark()
elif action == "delete_branch":
if project.leaf_branches:
target = random.choice(list(project.leaf_branches.values()))
log.info("Trying to delete branch %s", target)
target.delete()
else:
log.info("Leaf branches not found, skipping")
elif action == "new_ro_endpoint":
ep = random.choice(
[br for br in project.branches.values() if br.id not in project.reset_branches]
).create_ro_endpoint()
log.info("Created the RO endpoint with id %s branch: %s", ep.id, ep.branch.id)
ep.start_benchmark()
elif action == "delete_ro_endpoint":
ro_endpoints: list[NeonEndpoint] = [
endpoint for endpoint in project.endpoints.values() if endpoint.type == "read_only"
]
if ro_endpoints:
target_ep: NeonEndpoint = random.choice(ro_endpoints)
target_ep.delete()
log.info("endpoint %s deleted", target_ep.id)
else:
log.info("no read_only endpoints present, skipping")
elif action == "restore_random_time":
if project.leaf_branches:
br: NeonBranch = random.choice(list(project.leaf_branches.values()))
log.info("Restore %s", br)
br.restore_random_time()
else:
log.info("No leaf branches found")
else:
raise ValueError(f"The action {action} is unknown")
@pytest.mark.timeout(7200)
@pytest.mark.remote_cluster
def test_api_random(
setup_class,
pg_distrib_dir: Path,
test_output_dir: Path,
):
"""
Run the random API tests
"""
if seed_env := os.getenv("RANDOM_SEED"):
seed = int(seed_env)
else:
seed = 0
if seed == 0:
seed = int(time.time())
log.info("Using random seed: %s", seed)
random.seed(seed)
pg_bin, project = setup_class
# Here we can assign weights
ACTIONS = (
("new_branch", 1.5),
("new_ro_endpoint", 1.4),
("delete_ro_endpoint", 0.8),
("delete_branch", 1.0),
("restore_random_time", 1.2),
)
if num_ops_env := os.getenv("NUM_OPERATIONS"):
num_operations = int(num_ops_env)
else:
num_operations = 250
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", "-s100"], env=project.main_branch.connect_env)
for _ in range(num_operations):
log.info("Starting action #%s", _ + 1)
do_action(
project, random.choices([a[0] for a in ACTIONS], weights=[w[1] for w in ACTIONS])[0]
)
project.check_all_benchmarks()
assert True

View File

@@ -4114,13 +4114,29 @@ def test_storage_controller_location_conf_equivalence(neon_env_builder: NeonEnvB
assert reconciles_after_restart == 0
class RestartStorcon(Enum):
RESTART = "restart"
ONLINE = "online"
class DeletionSubject(Enum):
TIMELINE = "timeline"
TENANT = "tenant"
@run_only_on_default_postgres("PG version is not interesting here")
@pytest.mark.parametrize("restart_storcon", [True, False])
def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart_storcon: bool):
@pytest.mark.parametrize("restart_storcon", [RestartStorcon.RESTART, RestartStorcon.ONLINE])
@pytest.mark.parametrize("deletetion_subject", [DeletionSubject.TENANT, DeletionSubject.TIMELINE])
def test_storcon_create_delete_sk_down(
neon_env_builder: NeonEnvBuilder,
restart_storcon: RestartStorcon,
deletetion_subject: DeletionSubject,
):
"""
Test that the storcon can create and delete tenants and timelines with a safekeeper being down.
- restart_storcon: tests whether the pending ops are persisted.
- restart_storcon: tests that the pending ops are persisted.
if we don't restart, we test that we don't require it to come from the db.
- deletion_subject: test that both single timeline and whole tenant deletion work.
"""
neon_env_builder.num_safekeepers = 3
@@ -4143,6 +4159,7 @@ def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.create_tenant(tenant_id, timeline_id)
child_timeline_id = env.create_branch("child_of_main", tenant_id)
env.safekeepers[1].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
env.safekeepers[2].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
@@ -4155,7 +4172,7 @@ def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart
]
)
if restart_storcon:
if restart_storcon == RestartStorcon.RESTART:
# Restart the storcon to check that we persist operations
env.storage_controller.stop()
env.storage_controller.start()
@@ -4168,6 +4185,13 @@ def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
with env.endpoints.create(
"child_of_main", tenant_id=tenant_id, config_lines=config_lines
) as ep:
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
env.storage_controller.assert_log_contains("writing pending op for sk id 1")
env.safekeepers[0].start()
@@ -4176,25 +4200,31 @@ def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart
env.safekeepers[0].assert_log_contains(
f"pulling timeline {tenant_id}/{timeline_id} from safekeeper"
)
env.safekeepers[0].assert_log_contains(
f"pulling timeline {tenant_id}/{child_timeline_id} from safekeeper"
)
wait_until(logged_contains_on_sk)
env.safekeepers[1].stop()
env.storage_controller.pageserver_api().tenant_delete(tenant_id)
if deletetion_subject == DeletionSubject.TENANT:
env.storage_controller.pageserver_api().tenant_delete(tenant_id)
else:
env.storage_controller.pageserver_api().timeline_delete(tenant_id, child_timeline_id)
# ensure the safekeeper deleted the timeline
def timeline_deleted_on_active_sks():
env.safekeepers[0].assert_log_contains(
f"deleting timeline {tenant_id}/{timeline_id} from disk"
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
)
env.safekeepers[2].assert_log_contains(
f"deleting timeline {tenant_id}/{timeline_id} from disk"
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
)
wait_until(timeline_deleted_on_active_sks)
if restart_storcon:
if restart_storcon == RestartStorcon.RESTART:
# Restart the storcon to check that we persist operations
env.storage_controller.stop()
env.storage_controller.start()
@@ -4204,7 +4234,7 @@ def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart
# ensure that there is log msgs for the third safekeeper too
def timeline_deleted_on_sk():
env.safekeepers[1].assert_log_contains(
f"deleting timeline {tenant_id}/{timeline_id} from disk"
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
)
wait_until(timeline_deleted_on_sk)