mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-27 07:10:37 +00:00
Compare commits
7 Commits
cloneable/
...
skyzh/uplo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c32c26d741 | ||
|
|
6da5c189b5 | ||
|
|
66c01e46ca | ||
|
|
94fe9d00f3 | ||
|
|
dc19960cbf | ||
|
|
d3b654f6db | ||
|
|
4a898cfa2c |
@@ -349,7 +349,7 @@ jobs:
|
||||
contents: read
|
||||
statuses: write
|
||||
needs: [ build-neon ]
|
||||
runs-on: ${{ fromJSON(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'large-arm64' || 'large-metal')) }}
|
||||
runs-on: ${{ fromJSON(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'large-arm64' || 'large')) }}
|
||||
container:
|
||||
image: ${{ inputs.build-tools-image }}
|
||||
credentials:
|
||||
|
||||
2
.github/workflows/_meta.yml
vendored
2
.github/workflows/_meta.yml
vendored
@@ -165,5 +165,5 @@ jobs:
|
||||
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
CURRENT_SHA: ${{ github.sha }}
|
||||
run: |
|
||||
RELEASE_PR_RUN_ID=$(gh api "/repos/${GITHUB_REPOSITORY}/actions/runs?head_sha=$CURRENT_SHA" | jq '[.workflow_runs[] | select(.name == "Build and Test") | select(.head_branch | test("^rc/release.*$"; "s"))] | first | .id // ("Failed to find Build and Test run from RC PR!" | halt_error(1))')
|
||||
RELEASE_PR_RUN_ID=$(gh api "/repos/${GITHUB_REPOSITORY}/actions/runs?head_sha=$CURRENT_SHA" | jq '[.workflow_runs[] | select(.name == "Build and Test") | select(.head_branch | test("^rc/release(-(proxy|compute))?/[0-9]{4}-[0-9]{2}-[0-9]{2}$"; "s"))] | first | .id // ("Failed to find Build and Test run from RC PR!" | halt_error(1))')
|
||||
echo "release-pr-run-id=$RELEASE_PR_RUN_ID" | tee -a $GITHUB_OUTPUT
|
||||
|
||||
8
.github/workflows/fast-forward.yml
vendored
8
.github/workflows/fast-forward.yml
vendored
@@ -27,17 +27,15 @@ jobs:
|
||||
- name: Fast forwarding
|
||||
uses: sequoia-pgp/fast-forward@ea7628bedcb0b0b96e94383ada458d812fca4979
|
||||
# See https://docs.github.com/en/graphql/reference/enums#mergestatestatus
|
||||
if: ${{ contains(fromJSON('["clean", "unstable"]'), github.event.pull_request.mergeable_state) }}
|
||||
if: ${{ github.event.pull_request.mergeable_state == 'clean' }}
|
||||
with:
|
||||
merge: true
|
||||
comment: on-error
|
||||
github_token: ${{ secrets.CI_ACCESS_TOKEN }}
|
||||
|
||||
- name: Comment if mergeable_state is not clean
|
||||
if: ${{ !contains(fromJSON('["clean", "unstable"]'), github.event.pull_request.mergeable_state) }}
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
|
||||
if: ${{ github.event.pull_request.mergeable_state != 'clean' }}
|
||||
run: |
|
||||
gh pr comment ${{ github.event.pull_request.number }} \
|
||||
--repo "${GITHUB_REPOSITORY}" \
|
||||
--body "Not trying to forward pull-request, because \`mergeable_state\` is \`${{ github.event.pull_request.mergeable_state }}\`, not \`clean\` or \`unstable\`."
|
||||
--body "Not trying to forward pull-request, because \`mergeable_state\` is \`${{ github.event.pull_request.mergeable_state }}\`, not \`clean\`."
|
||||
|
||||
93
.github/workflows/random-ops-test.yml
vendored
93
.github/workflows/random-ops-test.yml
vendored
@@ -1,93 +0,0 @@
|
||||
name: Random Operations Test
|
||||
|
||||
on:
|
||||
schedule:
|
||||
# * is a special character in YAML so you have to quote this string
|
||||
# ┌───────────── minute (0 - 59)
|
||||
# │ ┌───────────── hour (0 - 23)
|
||||
# │ │ ┌───────────── day of the month (1 - 31)
|
||||
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
|
||||
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
|
||||
- cron: '23 */2 * * *' # runs every 2 hours
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
random_seed:
|
||||
type: number
|
||||
description: 'The random seed'
|
||||
required: false
|
||||
default: 0
|
||||
num_operations:
|
||||
type: number
|
||||
description: "The number of operations to test"
|
||||
default: 250
|
||||
|
||||
defaults:
|
||||
run:
|
||||
shell: bash -euxo pipefail {0}
|
||||
|
||||
permissions: {}
|
||||
|
||||
env:
|
||||
DEFAULT_PG_VERSION: 16
|
||||
PLATFORM: neon-captest-new
|
||||
AWS_DEFAULT_REGION: eu-central-1
|
||||
|
||||
jobs:
|
||||
run-random-rests:
|
||||
env:
|
||||
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
|
||||
runs-on: small
|
||||
permissions:
|
||||
id-token: write
|
||||
statuses: write
|
||||
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
pg-version: [16, 17]
|
||||
|
||||
container:
|
||||
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
|
||||
credentials:
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
options: --init
|
||||
steps:
|
||||
- name: Harden the runner (Audit all outbound calls)
|
||||
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
|
||||
with:
|
||||
egress-policy: audit
|
||||
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
|
||||
- name: Download Neon artifact
|
||||
uses: ./.github/actions/download
|
||||
with:
|
||||
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
|
||||
path: /tmp/neon/
|
||||
prefix: latest
|
||||
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
|
||||
|
||||
- name: Run tests
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
build_type: remote
|
||||
test_selection: random_ops
|
||||
run_in_parallel: false
|
||||
extra_params: -m remote_cluster
|
||||
pg_version: ${{ matrix.pg-version }}
|
||||
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
|
||||
env:
|
||||
NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||
RANDOM_SEED: ${{ inputs.random_seed }}
|
||||
NUM_OPERATIONS: ${{ inputs.num_operations }}
|
||||
|
||||
- name: Create Allure report
|
||||
if: ${{ !cancelled() }}
|
||||
id: create-allure-report
|
||||
uses: ./.github/actions/allure-report-generate
|
||||
with:
|
||||
store-test-results-into-db: true
|
||||
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
|
||||
env:
|
||||
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
|
||||
16
Cargo.lock
generated
16
Cargo.lock
generated
@@ -4251,6 +4251,7 @@ dependencies = [
|
||||
"arc-swap",
|
||||
"async-compression",
|
||||
"async-stream",
|
||||
"base64 0.13.1",
|
||||
"bincode",
|
||||
"bit_field",
|
||||
"byteorder",
|
||||
@@ -4285,7 +4286,6 @@ dependencies = [
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
"pageserver_compaction",
|
||||
"pem",
|
||||
"pin-project-lite",
|
||||
"postgres-protocol",
|
||||
"postgres-types",
|
||||
@@ -4299,6 +4299,7 @@ dependencies = [
|
||||
"rand 0.8.5",
|
||||
"range-set-blaze",
|
||||
"regex",
|
||||
"remote_keys",
|
||||
"remote_storage",
|
||||
"reqwest",
|
||||
"rpds",
|
||||
@@ -5504,6 +5505,16 @@ version = "1.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c707298afce11da2efef2f600116fa93ffa7a032b5d7b628aa17711ec81383ca"
|
||||
|
||||
[[package]]
|
||||
name = "remote_keys"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"rand 0.8.5",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "remote_storage"
|
||||
version = "0.1.0"
|
||||
@@ -5519,6 +5530,7 @@ dependencies = [
|
||||
"azure_identity",
|
||||
"azure_storage",
|
||||
"azure_storage_blobs",
|
||||
"base64 0.13.1",
|
||||
"bytes",
|
||||
"camino",
|
||||
"camino-tempfile",
|
||||
@@ -5529,6 +5541,7 @@ dependencies = [
|
||||
"humantime-serde",
|
||||
"hyper 1.4.1",
|
||||
"itertools 0.10.5",
|
||||
"md5",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
@@ -6002,7 +6015,6 @@ dependencies = [
|
||||
"once_cell",
|
||||
"pageserver_api",
|
||||
"parking_lot 0.12.1",
|
||||
"pem",
|
||||
"postgres-protocol",
|
||||
"postgres_backend",
|
||||
"postgres_ffi",
|
||||
|
||||
@@ -30,6 +30,7 @@ members = [
|
||||
"libs/tenant_size_model",
|
||||
"libs/metrics",
|
||||
"libs/postgres_connection",
|
||||
"libs/remote_keys",
|
||||
"libs/remote_storage",
|
||||
"libs/tracing-utils",
|
||||
"libs/postgres_ffi/wal_craft",
|
||||
@@ -255,6 +256,7 @@ postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }
|
||||
postgres_initdb = { path = "./libs/postgres_initdb" }
|
||||
pq_proto = { version = "0.1", path = "./libs/pq_proto/" }
|
||||
remote_storage = { version = "0.1", path = "./libs/remote_storage/" }
|
||||
remote_keys = { version = "0.1", path = "./libs/remote_keys/" }
|
||||
safekeeper_api = { version = "0.1", path = "./libs/safekeeper_api" }
|
||||
safekeeper_client = { path = "./safekeeper/client" }
|
||||
desim = { version = "0.1", path = "./libs/desim" }
|
||||
|
||||
265
compute/patches/pg_anon.patch
Normal file
265
compute/patches/pg_anon.patch
Normal file
@@ -0,0 +1,265 @@
|
||||
commit 00aa659afc9c7336ab81036edec3017168aabf40
|
||||
Author: Heikki Linnakangas <heikki@neon.tech>
|
||||
Date: Tue Nov 12 16:59:19 2024 +0200
|
||||
|
||||
Temporarily disable test that depends on timezone
|
||||
|
||||
diff --git a/tests/expected/generalization.out b/tests/expected/generalization.out
|
||||
index 23ef5fa..9e60deb 100644
|
||||
--- a/ext-src/pg_anon-src/tests/expected/generalization.out
|
||||
+++ b/ext-src/pg_anon-src/tests/expected/generalization.out
|
||||
@@ -284,12 +284,9 @@ SELECT anon.generalize_tstzrange('19041107','century');
|
||||
["Tue Jan 01 00:00:00 1901 PST","Mon Jan 01 00:00:00 2001 PST")
|
||||
(1 row)
|
||||
|
||||
-SELECT anon.generalize_tstzrange('19041107','millennium');
|
||||
- generalize_tstzrange
|
||||
------------------------------------------------------------------
|
||||
- ["Thu Jan 01 00:00:00 1001 PST","Mon Jan 01 00:00:00 2001 PST")
|
||||
-(1 row)
|
||||
-
|
||||
+-- temporarily disabled, see:
|
||||
+-- https://gitlab.com/dalibo/postgresql_anonymizer/-/commit/199f0a392b37c59d92ae441fb8f037e094a11a52#note_2148017485
|
||||
+--SELECT anon.generalize_tstzrange('19041107','millennium');
|
||||
-- generalize_daterange
|
||||
SELECT anon.generalize_daterange('19041107');
|
||||
generalize_daterange
|
||||
diff --git a/tests/sql/generalization.sql b/tests/sql/generalization.sql
|
||||
index b868344..b4fc977 100644
|
||||
--- a/ext-src/pg_anon-src/tests/sql/generalization.sql
|
||||
+++ b/ext-src/pg_anon-src/tests/sql/generalization.sql
|
||||
@@ -61,7 +61,9 @@ SELECT anon.generalize_tstzrange('19041107','month');
|
||||
SELECT anon.generalize_tstzrange('19041107','year');
|
||||
SELECT anon.generalize_tstzrange('19041107','decade');
|
||||
SELECT anon.generalize_tstzrange('19041107','century');
|
||||
-SELECT anon.generalize_tstzrange('19041107','millennium');
|
||||
+-- temporarily disabled, see:
|
||||
+-- https://gitlab.com/dalibo/postgresql_anonymizer/-/commit/199f0a392b37c59d92ae441fb8f037e094a11a52#note_2148017485
|
||||
+--SELECT anon.generalize_tstzrange('19041107','millennium');
|
||||
|
||||
-- generalize_daterange
|
||||
SELECT anon.generalize_daterange('19041107');
|
||||
|
||||
commit 7dd414ee75f2875cffb1d6ba474df1f135a6fc6f
|
||||
Author: Alexey Masterov <alexeymasterov@neon.tech>
|
||||
Date: Fri May 31 06:34:26 2024 +0000
|
||||
|
||||
These alternative expected files were added to consider the neon features
|
||||
|
||||
diff --git a/ext-src/pg_anon-src/tests/expected/permissions_masked_role_1.out b/ext-src/pg_anon-src/tests/expected/permissions_masked_role_1.out
|
||||
new file mode 100644
|
||||
index 0000000..2539cfd
|
||||
--- /dev/null
|
||||
+++ b/ext-src/pg_anon-src/tests/expected/permissions_masked_role_1.out
|
||||
@@ -0,0 +1,101 @@
|
||||
+BEGIN;
|
||||
+CREATE EXTENSION anon CASCADE;
|
||||
+NOTICE: installing required extension "pgcrypto"
|
||||
+SELECT anon.init();
|
||||
+ init
|
||||
+------
|
||||
+ t
|
||||
+(1 row)
|
||||
+
|
||||
+CREATE ROLE mallory_the_masked_user;
|
||||
+SECURITY LABEL FOR anon ON ROLE mallory_the_masked_user IS 'MASKED';
|
||||
+CREATE TABLE t1(i INT);
|
||||
+ALTER TABLE t1 ADD COLUMN t TEXT;
|
||||
+SECURITY LABEL FOR anon ON COLUMN t1.t
|
||||
+IS 'MASKED WITH VALUE NULL';
|
||||
+INSERT INTO t1 VALUES (1,'test');
|
||||
+--
|
||||
+-- We're checking the owner's permissions
|
||||
+--
|
||||
+-- see
|
||||
+-- https://postgresql-anonymizer.readthedocs.io/en/latest/SECURITY/#permissions
|
||||
+--
|
||||
+SET ROLE mallory_the_masked_user;
|
||||
+SELECT anon.pseudo_first_name(0) IS NOT NULL;
|
||||
+ ?column?
|
||||
+----------
|
||||
+ t
|
||||
+(1 row)
|
||||
+
|
||||
+-- SHOULD FAIL
|
||||
+DO $$
|
||||
+BEGIN
|
||||
+ PERFORM anon.init();
|
||||
+ EXCEPTION WHEN insufficient_privilege
|
||||
+ THEN RAISE NOTICE 'insufficient_privilege';
|
||||
+END$$;
|
||||
+NOTICE: insufficient_privilege
|
||||
+-- SHOULD FAIL
|
||||
+DO $$
|
||||
+BEGIN
|
||||
+ PERFORM anon.anonymize_table('t1');
|
||||
+ EXCEPTION WHEN insufficient_privilege
|
||||
+ THEN RAISE NOTICE 'insufficient_privilege';
|
||||
+END$$;
|
||||
+NOTICE: insufficient_privilege
|
||||
+-- SHOULD FAIL
|
||||
+SAVEPOINT fail_start_engine;
|
||||
+SELECT anon.start_dynamic_masking();
|
||||
+ERROR: Only supersusers can start the dynamic masking engine.
|
||||
+CONTEXT: PL/pgSQL function anon.start_dynamic_masking(boolean) line 18 at RAISE
|
||||
+ROLLBACK TO fail_start_engine;
|
||||
+RESET ROLE;
|
||||
+SELECT anon.start_dynamic_masking();
|
||||
+ start_dynamic_masking
|
||||
+-----------------------
|
||||
+ t
|
||||
+(1 row)
|
||||
+
|
||||
+SET ROLE mallory_the_masked_user;
|
||||
+SELECT * FROM mask.t1;
|
||||
+ i | t
|
||||
+---+---
|
||||
+ 1 |
|
||||
+(1 row)
|
||||
+
|
||||
+-- SHOULD FAIL
|
||||
+DO $$
|
||||
+BEGIN
|
||||
+ SELECT * FROM public.t1;
|
||||
+ EXCEPTION WHEN insufficient_privilege
|
||||
+ THEN RAISE NOTICE 'insufficient_privilege';
|
||||
+END$$;
|
||||
+NOTICE: insufficient_privilege
|
||||
+-- SHOULD FAIL
|
||||
+SAVEPOINT fail_stop_engine;
|
||||
+SELECT anon.stop_dynamic_masking();
|
||||
+ERROR: Only supersusers can stop the dynamic masking engine.
|
||||
+CONTEXT: PL/pgSQL function anon.stop_dynamic_masking() line 18 at RAISE
|
||||
+ROLLBACK TO fail_stop_engine;
|
||||
+RESET ROLE;
|
||||
+SELECT anon.stop_dynamic_masking();
|
||||
+NOTICE: The previous priviledges of 'mallory_the_masked_user' are not restored. You need to grant them manually.
|
||||
+ stop_dynamic_masking
|
||||
+----------------------
|
||||
+ t
|
||||
+(1 row)
|
||||
+
|
||||
+SET ROLE mallory_the_masked_user;
|
||||
+SELECT COUNT(*)=1 FROM anon.pg_masking_rules;
|
||||
+ ?column?
|
||||
+----------
|
||||
+ t
|
||||
+(1 row)
|
||||
+
|
||||
+-- SHOULD FAIL
|
||||
+SAVEPOINT fail_seclabel_on_role;
|
||||
+SECURITY LABEL FOR anon ON ROLE mallory_the_masked_user IS NULL;
|
||||
+ERROR: permission denied
|
||||
+DETAIL: The current user must have the CREATEROLE attribute.
|
||||
+ROLLBACK TO fail_seclabel_on_role;
|
||||
+ROLLBACK;
|
||||
diff --git a/ext-src/pg_anon-src/tests/expected/permissions_owner_1.out b/ext-src/pg_anon-src/tests/expected/permissions_owner_1.out
|
||||
new file mode 100644
|
||||
index 0000000..8b090fe
|
||||
--- /dev/null
|
||||
+++ b/ext-src/pg_anon-src/tests/expected/permissions_owner_1.out
|
||||
@@ -0,0 +1,104 @@
|
||||
+BEGIN;
|
||||
+CREATE EXTENSION anon CASCADE;
|
||||
+NOTICE: installing required extension "pgcrypto"
|
||||
+SELECT anon.init();
|
||||
+ init
|
||||
+------
|
||||
+ t
|
||||
+(1 row)
|
||||
+
|
||||
+CREATE ROLE oscar_the_owner;
|
||||
+ALTER DATABASE :DBNAME OWNER TO oscar_the_owner;
|
||||
+CREATE ROLE mallory_the_masked_user;
|
||||
+SECURITY LABEL FOR anon ON ROLE mallory_the_masked_user IS 'MASKED';
|
||||
+--
|
||||
+-- We're checking the owner's permissions
|
||||
+--
|
||||
+-- see
|
||||
+-- https://postgresql-anonymizer.readthedocs.io/en/latest/SECURITY/#permissions
|
||||
+--
|
||||
+SET ROLE oscar_the_owner;
|
||||
+SELECT anon.pseudo_first_name(0) IS NOT NULL;
|
||||
+ ?column?
|
||||
+----------
|
||||
+ t
|
||||
+(1 row)
|
||||
+
|
||||
+-- SHOULD FAIL
|
||||
+DO $$
|
||||
+BEGIN
|
||||
+ PERFORM anon.init();
|
||||
+ EXCEPTION WHEN insufficient_privilege
|
||||
+ THEN RAISE NOTICE 'insufficient_privilege';
|
||||
+END$$;
|
||||
+NOTICE: insufficient_privilege
|
||||
+CREATE TABLE t1(i INT);
|
||||
+ALTER TABLE t1 ADD COLUMN t TEXT;
|
||||
+SECURITY LABEL FOR anon ON COLUMN t1.t
|
||||
+IS 'MASKED WITH VALUE NULL';
|
||||
+INSERT INTO t1 VALUES (1,'test');
|
||||
+SELECT anon.anonymize_table('t1');
|
||||
+ anonymize_table
|
||||
+-----------------
|
||||
+ t
|
||||
+(1 row)
|
||||
+
|
||||
+SELECT * FROM t1;
|
||||
+ i | t
|
||||
+---+---
|
||||
+ 1 |
|
||||
+(1 row)
|
||||
+
|
||||
+UPDATE t1 SET t='test' WHERE i=1;
|
||||
+-- SHOULD FAIL
|
||||
+SAVEPOINT fail_start_engine;
|
||||
+SELECT anon.start_dynamic_masking();
|
||||
+ start_dynamic_masking
|
||||
+-----------------------
|
||||
+ t
|
||||
+(1 row)
|
||||
+
|
||||
+ROLLBACK TO fail_start_engine;
|
||||
+RESET ROLE;
|
||||
+SELECT anon.start_dynamic_masking();
|
||||
+ start_dynamic_masking
|
||||
+-----------------------
|
||||
+ t
|
||||
+(1 row)
|
||||
+
|
||||
+SET ROLE oscar_the_owner;
|
||||
+SELECT * FROM t1;
|
||||
+ i | t
|
||||
+---+------
|
||||
+ 1 | test
|
||||
+(1 row)
|
||||
+
|
||||
+--SELECT * FROM mask.t1;
|
||||
+-- SHOULD FAIL
|
||||
+SAVEPOINT fail_stop_engine;
|
||||
+SELECT anon.stop_dynamic_masking();
|
||||
+ERROR: permission denied for schema mask
|
||||
+CONTEXT: SQL statement "DROP VIEW mask.t1;"
|
||||
+PL/pgSQL function anon.mask_drop_view(oid) line 3 at EXECUTE
|
||||
+SQL statement "SELECT anon.mask_drop_view(oid)
|
||||
+ FROM pg_catalog.pg_class
|
||||
+ WHERE relnamespace=quote_ident(pg_catalog.current_setting('anon.sourceschema'))::REGNAMESPACE
|
||||
+ AND relkind IN ('r','p','f')"
|
||||
+PL/pgSQL function anon.stop_dynamic_masking() line 22 at PERFORM
|
||||
+ROLLBACK TO fail_stop_engine;
|
||||
+RESET ROLE;
|
||||
+SELECT anon.stop_dynamic_masking();
|
||||
+NOTICE: The previous priviledges of 'mallory_the_masked_user' are not restored. You need to grant them manually.
|
||||
+ stop_dynamic_masking
|
||||
+----------------------
|
||||
+ t
|
||||
+(1 row)
|
||||
+
|
||||
+SET ROLE oscar_the_owner;
|
||||
+-- SHOULD FAIL
|
||||
+SAVEPOINT fail_seclabel_on_role;
|
||||
+SECURITY LABEL FOR anon ON ROLE mallory_the_masked_user IS NULL;
|
||||
+ERROR: permission denied
|
||||
+DETAIL: The current user must have the CREATEROLE attribute.
|
||||
+ROLLBACK TO fail_seclabel_on_role;
|
||||
+ROLLBACK;
|
||||
@@ -641,26 +641,7 @@ impl ComputeNode {
|
||||
|
||||
let log_directory_path = Path::new(&self.params.pgdata).join("log");
|
||||
let log_directory_path = log_directory_path.to_string_lossy().to_string();
|
||||
|
||||
// Add project_id,endpoint_id tag to identify the logs.
|
||||
//
|
||||
// These ids are passed from cplane,
|
||||
// for backwards compatibility (old computes that don't have them),
|
||||
// we set them to None.
|
||||
// TODO: Clean up this code when all computes have them.
|
||||
let tag: Option<String> = match (
|
||||
pspec.spec.project_id.as_deref(),
|
||||
pspec.spec.endpoint_id.as_deref(),
|
||||
) {
|
||||
(Some(project_id), Some(endpoint_id)) => {
|
||||
Some(format!("{project_id}/{endpoint_id}"))
|
||||
}
|
||||
(Some(project_id), None) => Some(format!("{project_id}/None")),
|
||||
(None, Some(endpoint_id)) => Some(format!("None,{endpoint_id}")),
|
||||
(None, None) => None,
|
||||
};
|
||||
|
||||
configure_audit_rsyslog(log_directory_path.clone(), tag, &remote_endpoint)?;
|
||||
configure_audit_rsyslog(log_directory_path.clone(), "hipaa", &remote_endpoint)?;
|
||||
|
||||
// Launch a background task to clean up the audit logs
|
||||
launch_pgaudit_gc(log_directory_path);
|
||||
|
||||
@@ -50,13 +50,13 @@ fn restart_rsyslog() -> Result<()> {
|
||||
|
||||
pub fn configure_audit_rsyslog(
|
||||
log_directory: String,
|
||||
tag: Option<String>,
|
||||
tag: &str,
|
||||
remote_endpoint: &str,
|
||||
) -> Result<()> {
|
||||
let config_content: String = format!(
|
||||
include_str!("config_template/compute_audit_rsyslog_template.conf"),
|
||||
log_directory = log_directory,
|
||||
tag = tag.unwrap_or("".to_string()),
|
||||
tag = tag,
|
||||
remote_endpoint = remote_endpoint
|
||||
);
|
||||
|
||||
|
||||
@@ -682,10 +682,10 @@ pub mod tenant_conf_defaults {
|
||||
pub const DEFAULT_COMPACTION_SHARD_ANCESTOR: bool = true;
|
||||
|
||||
// This value needs to be tuned to avoid OOM. We have 3/4*CPUs threads for L0 compaction, that's
|
||||
// 3/4*8=6 on most of our pageservers. Compacting 10 layers requires a maximum of
|
||||
// DEFAULT_CHECKPOINT_DISTANCE*10 memory, that's 2560MB. So with this config, we can get a maximum peak
|
||||
// compaction usage of 15360MB.
|
||||
pub const DEFAULT_COMPACTION_UPPER_LIMIT: usize = 10;
|
||||
// 3/4*16=9 on most of our pageservers. Compacting 20 layers requires about 1 GB memory (could
|
||||
// be reduced later by optimizing L0 hole calculation to avoid loading all keys into memory). So
|
||||
// with this config, we can get a maximum peak compaction usage of 9 GB.
|
||||
pub const DEFAULT_COMPACTION_UPPER_LIMIT: usize = 20;
|
||||
// Enable L0 compaction pass and semaphore by default. L0 compaction must be responsive to avoid
|
||||
// read amp.
|
||||
pub const DEFAULT_COMPACTION_L0_FIRST: bool = true;
|
||||
@@ -702,11 +702,8 @@ pub mod tenant_conf_defaults {
|
||||
// Relevant: https://github.com/neondatabase/neon/issues/3394
|
||||
pub const DEFAULT_GC_PERIOD: &str = "1 hr";
|
||||
pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3;
|
||||
// Currently, any value other than 0 will trigger image layer creation preemption immediately with L0 backpressure
|
||||
// without looking at the exact number of L0 layers.
|
||||
// It was expected to have the following behavior:
|
||||
// > If there are more than threshold * compaction_threshold (that is 3 * 10 in the default config) L0 layers, image
|
||||
// > layer creation will end immediately. Set to 0 to disable.
|
||||
// If there are more than threshold * compaction_threshold (that is 3 * 10 in the default config) L0 layers, image
|
||||
// layer creation will end immediately. Set to 0 to disable.
|
||||
pub const DEFAULT_IMAGE_CREATION_PREEMPT_THRESHOLD: usize = 3;
|
||||
pub const DEFAULT_PITR_INTERVAL: &str = "7 days";
|
||||
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "10 seconds";
|
||||
|
||||
13
libs/remote_keys/Cargo.toml
Normal file
13
libs/remote_keys/Cargo.toml
Normal file
@@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "remote_keys"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
utils.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
rand.workspace = true
|
||||
42
libs/remote_keys/src/lib.rs
Normal file
42
libs/remote_keys/src/lib.rs
Normal file
@@ -0,0 +1,42 @@
|
||||
//! A module that provides a KMS implementation that generates and unwraps the keys.
|
||||
//!
|
||||
|
||||
/// A KMS implementation that does static wrapping and unwrapping of the keys.
|
||||
pub struct NaiveKms {
|
||||
account_id: String,
|
||||
}
|
||||
|
||||
impl NaiveKms {
|
||||
pub fn new(account_id: String) -> Self {
|
||||
Self { account_id }
|
||||
}
|
||||
|
||||
pub fn encrypt(&self, plain: &[u8]) -> anyhow::Result<Vec<u8>> {
|
||||
let wrapped = [self.account_id.as_bytes(), "-wrapped-".as_bytes(), plain].concat();
|
||||
Ok(wrapped)
|
||||
}
|
||||
|
||||
pub fn decrypt(&self, wrapped: &[u8]) -> anyhow::Result<Vec<u8>> {
|
||||
let Some(wrapped) = wrapped.strip_prefix(self.account_id.as_bytes()) else {
|
||||
return Err(anyhow::anyhow!("invalid key"));
|
||||
};
|
||||
let Some(plain) = wrapped.strip_prefix(b"-wrapped-") else {
|
||||
return Err(anyhow::anyhow!("invalid key"));
|
||||
};
|
||||
Ok(plain.to_vec())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_generate_key() {
|
||||
let kms = NaiveKms::new("test-tenant".to_string());
|
||||
let data = rand::random::<[u8; 32]>().to_vec();
|
||||
let encrypted = kms.encrypt(&data).unwrap();
|
||||
let decrypted = kms.decrypt(&encrypted).unwrap();
|
||||
assert_eq!(data, decrypted);
|
||||
}
|
||||
}
|
||||
@@ -13,6 +13,7 @@ aws-smithy-async.workspace = true
|
||||
aws-smithy-types.workspace = true
|
||||
aws-config.workspace = true
|
||||
aws-sdk-s3.workspace = true
|
||||
base64.workspace = true
|
||||
bytes.workspace = true
|
||||
camino = { workspace = true, features = ["serde1"] }
|
||||
humantime-serde.workspace = true
|
||||
@@ -27,6 +28,7 @@ tokio-util = { workspace = true, features = ["compat"] }
|
||||
toml_edit.workspace = true
|
||||
tracing.workspace = true
|
||||
scopeguard.workspace = true
|
||||
md5.workspace = true
|
||||
metrics.workspace = true
|
||||
utils = { path = "../utils", default-features = false }
|
||||
pin-project-lite.workspace = true
|
||||
|
||||
@@ -550,6 +550,19 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
self.download_for_builder(builder, timeout, cancel).await
|
||||
}
|
||||
|
||||
#[allow(unused_variables)]
|
||||
async fn upload_with_encryption(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
|
||||
self.delete_objects(std::array::from_ref(path), cancel)
|
||||
.await
|
||||
|
||||
@@ -190,6 +190,8 @@ pub struct DownloadOpts {
|
||||
/// timeouts: for something like an index/manifest/heatmap, we should time out faster than
|
||||
/// for layer files
|
||||
pub kind: DownloadKind,
|
||||
/// The encryption key to use for the download.
|
||||
pub encryption_key: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
pub enum DownloadKind {
|
||||
@@ -204,6 +206,7 @@ impl Default for DownloadOpts {
|
||||
byte_start: Bound::Unbounded,
|
||||
byte_end: Bound::Unbounded,
|
||||
kind: DownloadKind::Large,
|
||||
encryption_key: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -241,6 +244,15 @@ impl DownloadOpts {
|
||||
None => format!("bytes={start}-"),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn with_encryption_key(mut self, encryption_key: Option<impl AsRef<[u8]>>) -> Self {
|
||||
self.encryption_key = encryption_key.map(|k| k.as_ref().to_vec());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn encryption_key(&self) -> Option<&[u8]> {
|
||||
self.encryption_key.as_deref()
|
||||
}
|
||||
}
|
||||
|
||||
/// Storage (potentially remote) API to manage its state.
|
||||
@@ -331,6 +343,19 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError>;
|
||||
|
||||
/// Same as upload, but with remote encryption if the backend supports it (e.g. SSE-C on AWS).
|
||||
async fn upload_with_encryption(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
// S3 PUT request requires the content length to be specified,
|
||||
// otherwise it starts to fail with the concurrent connection count increasing.
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()>;
|
||||
|
||||
/// Delete a single path from remote storage.
|
||||
///
|
||||
/// If the operation fails because of timeout or cancellation, the root cause of the error will be
|
||||
@@ -615,6 +640,63 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn upload_with_encryption(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
match self {
|
||||
Self::LocalFs(s) => {
|
||||
s.upload_with_encryption(
|
||||
from,
|
||||
data_size_bytes,
|
||||
to,
|
||||
metadata,
|
||||
encryption_key,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Self::AwsS3(s) => {
|
||||
s.upload_with_encryption(
|
||||
from,
|
||||
data_size_bytes,
|
||||
to,
|
||||
metadata,
|
||||
encryption_key,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Self::AzureBlob(s) => {
|
||||
s.upload_with_encryption(
|
||||
from,
|
||||
data_size_bytes,
|
||||
to,
|
||||
metadata,
|
||||
encryption_key,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Self::Unreliable(s) => {
|
||||
s.upload_with_encryption(
|
||||
from,
|
||||
data_size_bytes,
|
||||
to,
|
||||
metadata,
|
||||
encryption_key,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl GenericRemoteStorage {
|
||||
|
||||
@@ -198,6 +198,10 @@ impl LocalFs {
|
||||
let mut entries = cur_folder.read_dir_utf8()?;
|
||||
while let Some(Ok(entry)) = entries.next() {
|
||||
let file_name = entry.file_name();
|
||||
if file_name.ends_with(".metadata") || file_name.ends_with(".enc") {
|
||||
// ignore metadata and encryption key files
|
||||
continue;
|
||||
}
|
||||
let full_file_name = cur_folder.join(file_name);
|
||||
if full_file_name.as_str().starts_with(prefix) {
|
||||
let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
|
||||
@@ -218,6 +222,7 @@ impl LocalFs {
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
enctyption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let target_file_path = to.with_base(&self.storage_root);
|
||||
@@ -306,6 +311,8 @@ impl LocalFs {
|
||||
)
|
||||
})?;
|
||||
|
||||
// TODO: we might need to make the following writes atomic with the file write operation above
|
||||
|
||||
if let Some(storage_metadata) = metadata {
|
||||
// FIXME: we must not be using metadata much, since this would forget the old metadata
|
||||
// for new writes? or perhaps metadata is sticky; could consider removing if it's never
|
||||
@@ -324,6 +331,15 @@ impl LocalFs {
|
||||
})?;
|
||||
}
|
||||
|
||||
if let Some(encryption_key) = enctyption_key {
|
||||
let encryption_key_path = storage_encryption_key_path(&target_file_path);
|
||||
fs::write(&encryption_key_path, encryption_key).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to write encryption key to the local storage at '{encryption_key_path}'",
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -450,6 +466,7 @@ impl RemoteStorage for LocalFs {
|
||||
key: &RemotePath,
|
||||
_cancel: &CancellationToken,
|
||||
) -> Result<ListingObject, DownloadError> {
|
||||
// TODO: check encryption key
|
||||
let target_file_path = key.with_base(&self.storage_root);
|
||||
let metadata = file_metadata(&target_file_path).await?;
|
||||
Ok(ListingObject {
|
||||
@@ -461,34 +478,14 @@ impl RemoteStorage for LocalFs {
|
||||
|
||||
async fn upload(
|
||||
&self,
|
||||
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
|
||||
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let cancel = cancel.child_token();
|
||||
|
||||
let op = self.upload0(data, data_size_bytes, to, metadata, &cancel);
|
||||
let mut op = std::pin::pin!(op);
|
||||
|
||||
// race the upload0 to the timeout; if it goes over, do a graceful shutdown
|
||||
let (res, timeout) = tokio::select! {
|
||||
res = &mut op => (res, false),
|
||||
_ = tokio::time::sleep(self.timeout) => {
|
||||
cancel.cancel();
|
||||
(op.await, true)
|
||||
}
|
||||
};
|
||||
|
||||
match res {
|
||||
Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => {
|
||||
// we caused this cancel (or they happened simultaneously) -- swap it out to
|
||||
// Timeout
|
||||
Err(TimeoutOrCancel::Timeout.into())
|
||||
}
|
||||
res => res,
|
||||
}
|
||||
self.upload_with_encryption(data, data_size_bytes, to, metadata, None, cancel)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn download(
|
||||
@@ -506,6 +503,22 @@ impl RemoteStorage for LocalFs {
|
||||
return Err(DownloadError::Unmodified);
|
||||
}
|
||||
|
||||
let key = match fs::read(storage_encryption_key_path(&target_path)).await {
|
||||
Ok(key) => Some(key),
|
||||
Err(e) if e.kind() == ErrorKind::NotFound => None,
|
||||
Err(e) => {
|
||||
return Err(DownloadError::Other(
|
||||
anyhow::anyhow!(e).context("cannot read encryption key"),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
if key != opts.encryption_key {
|
||||
return Err(DownloadError::Other(anyhow::anyhow!(
|
||||
"encryption key mismatch"
|
||||
)));
|
||||
}
|
||||
|
||||
let mut file = fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(&target_path)
|
||||
@@ -551,12 +564,53 @@ impl RemoteStorage for LocalFs {
|
||||
async fn delete(&self, path: &RemotePath, _cancel: &CancellationToken) -> anyhow::Result<()> {
|
||||
let file_path = path.with_base(&self.storage_root);
|
||||
match fs::remove_file(&file_path).await {
|
||||
Ok(()) => Ok(()),
|
||||
Ok(()) => {}
|
||||
// The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
|
||||
// See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
|
||||
// > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
|
||||
Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
|
||||
Err(e) => Err(anyhow::anyhow!(e)),
|
||||
Err(e) if e.kind() == ErrorKind::NotFound => {}
|
||||
Err(e) => return Err(anyhow::anyhow!(e)),
|
||||
};
|
||||
fs::remove_file(&storage_metadata_path(&file_path))
|
||||
.await
|
||||
.ok();
|
||||
fs::remove_file(&storage_encryption_key_path(&file_path))
|
||||
.await
|
||||
.ok();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(unused_variables)]
|
||||
async fn upload_with_encryption(
|
||||
&self,
|
||||
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let cancel = cancel.child_token();
|
||||
|
||||
let op = self.upload0(data, data_size_bytes, to, metadata, encryption_key, &cancel);
|
||||
let mut op = std::pin::pin!(op);
|
||||
|
||||
// race the upload0 to the timeout; if it goes over, do a graceful shutdown
|
||||
let (res, timeout) = tokio::select! {
|
||||
res = &mut op => (res, false),
|
||||
_ = tokio::time::sleep(self.timeout) => {
|
||||
cancel.cancel();
|
||||
(op.await, true)
|
||||
}
|
||||
};
|
||||
|
||||
match res {
|
||||
Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => {
|
||||
// we caused this cancel (or they happened simultaneously) -- swap it out to
|
||||
// Timeout
|
||||
Err(TimeoutOrCancel::Timeout.into())
|
||||
}
|
||||
res => res,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -591,6 +645,7 @@ impl RemoteStorage for LocalFs {
|
||||
to_path = to_path
|
||||
)
|
||||
})?;
|
||||
// TODO: copy metadata and encryption key
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -609,6 +664,10 @@ fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
|
||||
path_with_suffix_extension(original_path, "metadata")
|
||||
}
|
||||
|
||||
fn storage_encryption_key_path(original_path: &Utf8Path) -> Utf8PathBuf {
|
||||
path_with_suffix_extension(original_path, "enc")
|
||||
}
|
||||
|
||||
async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> {
|
||||
let target_dir = match target_file_path.parent() {
|
||||
Some(parent_dir) => parent_dir,
|
||||
|
||||
@@ -66,7 +66,10 @@ struct GetObjectRequest {
|
||||
key: String,
|
||||
etag: Option<String>,
|
||||
range: Option<String>,
|
||||
/// Base64 encoded SSE-C key for server-side encryption.
|
||||
sse_c_key: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl S3Bucket {
|
||||
/// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
|
||||
pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
|
||||
@@ -257,6 +260,13 @@ impl S3Bucket {
|
||||
builder = builder.if_none_match(etag);
|
||||
}
|
||||
|
||||
if let Some(encryption_key) = request.sse_c_key {
|
||||
builder = builder.sse_customer_algorithm("AES256");
|
||||
builder = builder.sse_customer_key(base64::encode(&encryption_key));
|
||||
builder = builder
|
||||
.sse_customer_key_md5(base64::encode(md5::compute(&encryption_key).as_slice()));
|
||||
}
|
||||
|
||||
let get_object = builder.send();
|
||||
|
||||
let get_object = tokio::select! {
|
||||
@@ -693,12 +703,13 @@ impl RemoteStorage for S3Bucket {
|
||||
})
|
||||
}
|
||||
|
||||
async fn upload(
|
||||
async fn upload_with_encryption(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
from_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let kind = RequestKind::Put;
|
||||
@@ -709,7 +720,7 @@ impl RemoteStorage for S3Bucket {
|
||||
let body = StreamBody::new(from.map(|x| x.map(Frame::data)));
|
||||
let bytes_stream = ByteStream::new(SdkBody::from_body_1_x(body));
|
||||
|
||||
let upload = self
|
||||
let mut upload = self
|
||||
.client
|
||||
.put_object()
|
||||
.bucket(self.bucket_name.clone())
|
||||
@@ -717,8 +728,17 @@ impl RemoteStorage for S3Bucket {
|
||||
.set_metadata(metadata.map(|m| m.0))
|
||||
.set_storage_class(self.upload_storage_class.clone())
|
||||
.content_length(from_size_bytes.try_into()?)
|
||||
.body(bytes_stream)
|
||||
.send();
|
||||
.body(bytes_stream);
|
||||
|
||||
if let Some(encryption_key) = encryption_key {
|
||||
upload = upload.sse_customer_algorithm("AES256");
|
||||
let base64_key = base64::encode(encryption_key);
|
||||
upload = upload.sse_customer_key(&base64_key);
|
||||
upload = upload
|
||||
.sse_customer_key_md5(base64::encode(md5::compute(encryption_key).as_slice()));
|
||||
}
|
||||
|
||||
let upload = upload.send();
|
||||
|
||||
let upload = tokio::time::timeout(self.timeout, upload);
|
||||
|
||||
@@ -742,6 +762,18 @@ impl RemoteStorage for S3Bucket {
|
||||
}
|
||||
}
|
||||
|
||||
async fn upload(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
self.upload_with_encryption(from, data_size_bytes, to, metadata, None, cancel)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn copy(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
@@ -801,6 +833,7 @@ impl RemoteStorage for S3Bucket {
|
||||
key: self.relative_path_to_s3_object(from),
|
||||
etag: opts.etag.as_ref().map(|e| e.to_string()),
|
||||
range: opts.byte_range_header(),
|
||||
sse_c_key: opts.encryption_key.clone(),
|
||||
},
|
||||
cancel,
|
||||
)
|
||||
|
||||
@@ -178,6 +178,19 @@ impl RemoteStorage for UnreliableWrapper {
|
||||
self.inner.download(from, opts, cancel).await
|
||||
}
|
||||
|
||||
#[allow(unused_variables)]
|
||||
async fn upload_with_encryption(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
|
||||
self.delete_inner(path, true, cancel).await
|
||||
}
|
||||
|
||||
@@ -421,7 +421,7 @@ async fn download_is_timeouted(ctx: &mut MaybeEnabledStorage) {
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
|
||||
let len = upload_large_enough_file(&ctx.client, &path, &cancel, None).await;
|
||||
|
||||
let timeout = std::time::Duration::from_secs(5);
|
||||
|
||||
@@ -500,7 +500,7 @@ async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
|
||||
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel, None).await;
|
||||
|
||||
{
|
||||
let stream = ctx
|
||||
@@ -555,6 +555,7 @@ async fn upload_large_enough_file(
|
||||
client: &GenericRemoteStorage,
|
||||
path: &RemotePath,
|
||||
cancel: &CancellationToken,
|
||||
encryption_key: Option<&[u8]>,
|
||||
) -> usize {
|
||||
let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
|
||||
let body = bytes::Bytes::from(vec![0u8; 1024]);
|
||||
@@ -565,9 +566,54 @@ async fn upload_large_enough_file(
|
||||
let contents = futures::stream::iter(contents.map(std::io::Result::Ok));
|
||||
|
||||
client
|
||||
.upload(contents, len, path, None, cancel)
|
||||
.upload_with_encryption(contents, len, path, None, encryption_key, cancel)
|
||||
.await
|
||||
.expect("upload succeeds");
|
||||
|
||||
len
|
||||
}
|
||||
|
||||
#[test_context(MaybeEnabledStorage)]
|
||||
#[tokio::test]
|
||||
async fn encryption_works(ctx: &mut MaybeEnabledStorage) {
|
||||
let MaybeEnabledStorage::Enabled(ctx) = ctx else {
|
||||
return;
|
||||
};
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let path = RemotePath::new(Utf8Path::new(
|
||||
format!("{}/file_to_copy", ctx.base_prefix).as_str(),
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let key = rand::random::<[u8; 32]>();
|
||||
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel, Some(&key)).await;
|
||||
|
||||
{
|
||||
let download = ctx
|
||||
.client
|
||||
.download(
|
||||
&path,
|
||||
&DownloadOpts::default().with_encryption_key(Some(&key)),
|
||||
&cancel,
|
||||
)
|
||||
.await
|
||||
.expect("should succeed");
|
||||
let vec = download_to_vec(download).await.expect("should succeed");
|
||||
assert_eq!(vec.len(), file_len);
|
||||
}
|
||||
|
||||
{
|
||||
// Download without encryption key should fail
|
||||
let download = ctx
|
||||
.client
|
||||
.download(&path, &DownloadOpts::default(), &cancel)
|
||||
.await;
|
||||
assert!(download.is_err());
|
||||
}
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
ctx.client.delete_objects(&[path], &cancel).await.unwrap();
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ anyhow.workspace = true
|
||||
arc-swap.workspace = true
|
||||
async-compression.workspace = true
|
||||
async-stream.workspace = true
|
||||
base64.workspace = true
|
||||
bit_field.workspace = true
|
||||
bincode.workspace = true
|
||||
byteorder.workspace = true
|
||||
@@ -78,11 +79,11 @@ metrics.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_client.workspace = true # for ResponseErrorMessageExt TOOD refactor that
|
||||
pageserver_compaction.workspace = true
|
||||
pem.workspace = true
|
||||
postgres_connection.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
pq_proto.workspace = true
|
||||
remote_storage.workspace = true
|
||||
remote_keys.workspace = true
|
||||
storage_broker.workspace = true
|
||||
tenant_size_model.workspace = true
|
||||
http-utils.workspace = true
|
||||
|
||||
@@ -45,6 +45,7 @@ fn bench_upload_queue_next_ready(c: &mut Criterion) {
|
||||
shard: ShardIndex::new(ShardNumber(1), ShardCount(2)),
|
||||
generation: Generation::Valid(1),
|
||||
file_size: 0,
|
||||
encryption_key: None,
|
||||
};
|
||||
|
||||
// Construct the (initial and uploaded) index with layer0.
|
||||
|
||||
@@ -68,13 +68,6 @@ pub(crate) struct Args {
|
||||
targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
/// State shared by all clients
|
||||
#[derive(Debug)]
|
||||
struct SharedState {
|
||||
start_work_barrier: tokio::sync::Barrier,
|
||||
live_stats: LiveStats,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct LiveStats {
|
||||
completed_requests: AtomicU64,
|
||||
@@ -247,26 +240,24 @@ async fn main_impl(
|
||||
all_ranges
|
||||
};
|
||||
|
||||
let live_stats = Arc::new(LiveStats::default());
|
||||
|
||||
let num_live_stats_dump = 1;
|
||||
let num_work_sender_tasks = args.num_clients.get() * timelines.len();
|
||||
let num_main_impl = 1;
|
||||
|
||||
let shared_state = Arc::new(SharedState {
|
||||
start_work_barrier: tokio::sync::Barrier::new(
|
||||
num_live_stats_dump + num_work_sender_tasks + num_main_impl,
|
||||
),
|
||||
live_stats: LiveStats::default(),
|
||||
});
|
||||
let cancel = CancellationToken::new();
|
||||
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
|
||||
num_live_stats_dump + num_work_sender_tasks + num_main_impl,
|
||||
));
|
||||
|
||||
let ss = shared_state.clone();
|
||||
tokio::spawn({
|
||||
let stats = Arc::clone(&live_stats);
|
||||
let start_work_barrier = Arc::clone(&start_work_barrier);
|
||||
async move {
|
||||
ss.start_work_barrier.wait().await;
|
||||
start_work_barrier.wait().await;
|
||||
loop {
|
||||
let start = std::time::Instant::now();
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
let stats = &ss.live_stats;
|
||||
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
|
||||
let missed = stats.missed.swap(0, Ordering::Relaxed);
|
||||
let elapsed = start.elapsed();
|
||||
@@ -279,12 +270,14 @@ async fn main_impl(
|
||||
}
|
||||
});
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let rps_period = args
|
||||
.per_client_rate
|
||||
.map(|rps_limit| Duration::from_secs_f64(1.0 / (rps_limit as f64)));
|
||||
let make_worker: &dyn Fn(WorkerId) -> Pin<Box<dyn Send + Future<Output = ()>>> = &|worker_id| {
|
||||
let ss = shared_state.clone();
|
||||
let cancel = cancel.clone();
|
||||
let live_stats = live_stats.clone();
|
||||
let start_work_barrier = start_work_barrier.clone();
|
||||
let ranges: Vec<KeyRange> = all_ranges
|
||||
.iter()
|
||||
.filter(|r| r.timeline == worker_id.timeline)
|
||||
@@ -294,8 +287,85 @@ async fn main_impl(
|
||||
rand::distributions::weighted::WeightedIndex::new(ranges.iter().map(|v| v.len()))
|
||||
.unwrap();
|
||||
|
||||
let cancel = cancel.clone();
|
||||
Box::pin(async move {
|
||||
client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await
|
||||
let client =
|
||||
pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut client = client
|
||||
.pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
start_work_barrier.wait().await;
|
||||
let client_start = Instant::now();
|
||||
let mut ticks_processed = 0;
|
||||
let mut inflight = VecDeque::new();
|
||||
while !cancel.is_cancelled() {
|
||||
// Detect if a request took longer than the RPS rate
|
||||
if let Some(period) = &rps_period {
|
||||
let periods_passed_until_now =
|
||||
usize::try_from(client_start.elapsed().as_micros() / period.as_micros())
|
||||
.unwrap();
|
||||
|
||||
if periods_passed_until_now > ticks_processed {
|
||||
live_stats.missed((periods_passed_until_now - ticks_processed) as u64);
|
||||
}
|
||||
ticks_processed = periods_passed_until_now;
|
||||
}
|
||||
|
||||
while inflight.len() < args.queue_depth.get() {
|
||||
let start = Instant::now();
|
||||
let req = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = Key::from_i128(key);
|
||||
assert!(key.is_rel_block_key());
|
||||
let (rel_tag, block_no) = key
|
||||
.to_rel_block()
|
||||
.expect("we filter non-rel-block keys out above");
|
||||
PagestreamGetPageRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid: 0,
|
||||
request_lsn: if rng.gen_bool(args.req_latest_probability) {
|
||||
Lsn::MAX
|
||||
} else {
|
||||
r.timeline_lsn
|
||||
},
|
||||
not_modified_since: r.timeline_lsn,
|
||||
},
|
||||
rel: rel_tag,
|
||||
blkno: block_no,
|
||||
}
|
||||
};
|
||||
client.getpage_send(req).await.unwrap();
|
||||
inflight.push_back(start);
|
||||
}
|
||||
|
||||
let start = inflight.pop_front().unwrap();
|
||||
client.getpage_recv().await.unwrap();
|
||||
let end = Instant::now();
|
||||
live_stats.request_done();
|
||||
ticks_processed += 1;
|
||||
STATS.with(|stats| {
|
||||
stats
|
||||
.borrow()
|
||||
.lock()
|
||||
.unwrap()
|
||||
.observe(end.duration_since(start))
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
if let Some(period) = &rps_period {
|
||||
let next_at = client_start
|
||||
+ Duration::from_micros(
|
||||
(ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
|
||||
);
|
||||
tokio::time::sleep_until(next_at.into()).await;
|
||||
}
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
@@ -317,7 +387,7 @@ async fn main_impl(
|
||||
};
|
||||
|
||||
info!("waiting for everything to become ready");
|
||||
shared_state.start_work_barrier.wait().await;
|
||||
start_work_barrier.wait().await;
|
||||
info!("work started");
|
||||
if let Some(runtime) = args.runtime {
|
||||
tokio::time::sleep(runtime.into()).await;
|
||||
@@ -346,91 +416,3 @@ async fn main_impl(
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
|
||||
async fn client_libpq(
|
||||
args: &Args,
|
||||
worker_id: WorkerId,
|
||||
shared_state: Arc<SharedState>,
|
||||
cancel: CancellationToken,
|
||||
rps_period: Option<Duration>,
|
||||
ranges: Vec<KeyRange>,
|
||||
weights: rand::distributions::weighted::WeightedIndex<i128>,
|
||||
) {
|
||||
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut client = client
|
||||
.pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
shared_state.start_work_barrier.wait().await;
|
||||
let client_start = Instant::now();
|
||||
let mut ticks_processed = 0;
|
||||
let mut inflight = VecDeque::new();
|
||||
while !cancel.is_cancelled() {
|
||||
// Detect if a request took longer than the RPS rate
|
||||
if let Some(period) = &rps_period {
|
||||
let periods_passed_until_now =
|
||||
usize::try_from(client_start.elapsed().as_micros() / period.as_micros()).unwrap();
|
||||
|
||||
if periods_passed_until_now > ticks_processed {
|
||||
shared_state
|
||||
.live_stats
|
||||
.missed((periods_passed_until_now - ticks_processed) as u64);
|
||||
}
|
||||
ticks_processed = periods_passed_until_now;
|
||||
}
|
||||
|
||||
while inflight.len() < args.queue_depth.get() {
|
||||
let start = Instant::now();
|
||||
let req = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = Key::from_i128(key);
|
||||
assert!(key.is_rel_block_key());
|
||||
let (rel_tag, block_no) = key
|
||||
.to_rel_block()
|
||||
.expect("we filter non-rel-block keys out above");
|
||||
PagestreamGetPageRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid: 0,
|
||||
request_lsn: if rng.gen_bool(args.req_latest_probability) {
|
||||
Lsn::MAX
|
||||
} else {
|
||||
r.timeline_lsn
|
||||
},
|
||||
not_modified_since: r.timeline_lsn,
|
||||
},
|
||||
rel: rel_tag,
|
||||
blkno: block_no,
|
||||
}
|
||||
};
|
||||
client.getpage_send(req).await.unwrap();
|
||||
inflight.push_back(start);
|
||||
}
|
||||
|
||||
let start = inflight.pop_front().unwrap();
|
||||
client.getpage_recv().await.unwrap();
|
||||
let end = Instant::now();
|
||||
shared_state.live_stats.request_done();
|
||||
ticks_processed += 1;
|
||||
STATS.with(|stats| {
|
||||
stats
|
||||
.borrow()
|
||||
.lock()
|
||||
.unwrap()
|
||||
.observe(end.duration_since(start))
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
if let Some(period) = &rps_period {
|
||||
let next_at = client_start
|
||||
+ Duration::from_micros(
|
||||
(ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
|
||||
);
|
||||
tokio::time::sleep_until(next_at.into()).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -416,18 +416,8 @@ fn start_pageserver(
|
||||
// The storage_broker::connect call needs to happen inside a tokio runtime thread.
|
||||
let broker_client = WALRECEIVER_RUNTIME
|
||||
.block_on(async {
|
||||
let tls_config = storage_broker::ClientTlsConfig::new().ca_certificates(
|
||||
conf.ssl_ca_certs
|
||||
.iter()
|
||||
.map(pem::encode)
|
||||
.map(storage_broker::Certificate::from_pem),
|
||||
);
|
||||
// Note: we do not attempt connecting here (but validate endpoints sanity).
|
||||
storage_broker::connect(
|
||||
conf.broker_endpoint.clone(),
|
||||
conf.broker_keepalive_interval,
|
||||
tls_config,
|
||||
)
|
||||
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)
|
||||
})
|
||||
.with_context(|| {
|
||||
format!(
|
||||
|
||||
@@ -17,10 +17,9 @@ use once_cell::sync::OnceCell;
|
||||
use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes};
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pem::Pem;
|
||||
use postgres_backend::AuthType;
|
||||
use remote_storage::{RemotePath, RemoteStorageConfig};
|
||||
use reqwest::Url;
|
||||
use reqwest::{Certificate, Url};
|
||||
use storage_broker::Uri;
|
||||
use utils::id::{NodeId, TimelineId};
|
||||
use utils::logging::{LogFormat, SecretString};
|
||||
@@ -68,8 +67,8 @@ pub struct PageServerConf {
|
||||
/// Period to reload certificate and private key from files.
|
||||
/// Default: 60s.
|
||||
pub ssl_cert_reload_period: Duration,
|
||||
/// Trusted root CA certificates to use in https APIs in PEM format.
|
||||
pub ssl_ca_certs: Vec<Pem>,
|
||||
/// Trusted root CA certificates to use in https APIs.
|
||||
pub ssl_ca_certs: Vec<Certificate>,
|
||||
|
||||
/// Current availability zone. Used for traffic metrics.
|
||||
pub availability_zone: Option<String>,
|
||||
@@ -498,10 +497,7 @@ impl PageServerConf {
|
||||
ssl_ca_certs: match ssl_ca_file {
|
||||
Some(ssl_ca_file) => {
|
||||
let buf = std::fs::read(ssl_ca_file)?;
|
||||
pem::parse_many(&buf)?
|
||||
.into_iter()
|
||||
.filter(|pem| pem.tag() == "CERTIFICATE")
|
||||
.collect()
|
||||
Certificate::from_pem_bundle(&buf)?
|
||||
}
|
||||
None => Vec::new(),
|
||||
},
|
||||
|
||||
@@ -8,7 +8,6 @@ use pageserver_api::upcall_api::{
|
||||
ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
|
||||
ValidateRequestTenant, ValidateResponse,
|
||||
};
|
||||
use reqwest::Certificate;
|
||||
use serde::Serialize;
|
||||
use serde::de::DeserializeOwned;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -77,8 +76,8 @@ impl StorageControllerUpcallClient {
|
||||
client = client.default_headers(headers);
|
||||
}
|
||||
|
||||
for cert in &conf.ssl_ca_certs {
|
||||
client = client.add_root_certificate(Certificate::from_der(cert.contents())?);
|
||||
for ssl_ca_cert in &conf.ssl_ca_certs {
|
||||
client = client.add_root_certificate(ssl_ca_cert.clone());
|
||||
}
|
||||
|
||||
Ok(Some(Self {
|
||||
|
||||
@@ -192,11 +192,12 @@ pub(crate) use download::{
|
||||
download_index_part, download_initdb_tar_zst, download_tenant_manifest, is_temp_download_file,
|
||||
list_remote_tenant_shards, list_remote_timelines,
|
||||
};
|
||||
use index::GcCompactionState;
|
||||
pub(crate) use index::LayerFileMetadata;
|
||||
use index::{EncryptionKey, EncryptionKeyId, EncryptionKeyPair, GcCompactionState, KeyVersion};
|
||||
use pageserver_api::models::{RelSizeMigration, TimelineArchivalState, TimelineVisibilityState};
|
||||
use pageserver_api::shard::{ShardIndex, TenantShardId};
|
||||
use regex::Regex;
|
||||
use remote_keys::NaiveKms;
|
||||
use remote_storage::{
|
||||
DownloadError, GenericRemoteStorage, ListingMode, RemotePath, TimeoutOrCancel,
|
||||
};
|
||||
@@ -367,6 +368,10 @@ pub(crate) struct RemoteTimelineClient {
|
||||
config: std::sync::RwLock<RemoteTimelineClientConfig>,
|
||||
|
||||
cancel: CancellationToken,
|
||||
|
||||
kms_impl: Option<NaiveKms>,
|
||||
|
||||
key_repo: std::sync::Mutex<HashMap<EncryptionKeyId, EncryptionKeyPair>>,
|
||||
}
|
||||
|
||||
impl Drop for RemoteTimelineClient {
|
||||
@@ -411,6 +416,9 @@ impl RemoteTimelineClient {
|
||||
)),
|
||||
config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(location_conf)),
|
||||
cancel: CancellationToken::new(),
|
||||
// TODO: make this configurable
|
||||
kms_impl: Some(NaiveKms::new(tenant_shard_id.tenant_id.to_string())),
|
||||
key_repo: std::sync::Mutex::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -727,9 +735,43 @@ impl RemoteTimelineClient {
|
||||
reason: "no need for a downloads gauge",
|
||||
},
|
||||
);
|
||||
let key_pair = if let Some(ref key_id) = layer_metadata.encryption_key {
|
||||
let wrapped_key = {
|
||||
let mut queue = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = queue.initialized_mut().unwrap();
|
||||
let encryption_key_pair =
|
||||
upload_queue.dirty.keys.iter().find(|key| &key.id == key_id);
|
||||
if let Some(encryption_key_pair) = encryption_key_pair {
|
||||
// TODO: also check if we have uploaded the key yet; we should never use a key that is not persisted
|
||||
encryption_key_pair.clone()
|
||||
} else {
|
||||
return Err(DownloadError::Other(anyhow::anyhow!(
|
||||
"Encryption key pair not found in index_part.json"
|
||||
)));
|
||||
}
|
||||
};
|
||||
let Some(kms) = self.kms_impl.as_ref() else {
|
||||
return Err(DownloadError::Other(anyhow::anyhow!(
|
||||
"KMS not configured when downloading encrypted layer file"
|
||||
)));
|
||||
};
|
||||
let plain_key = kms
|
||||
.decrypt(&wrapped_key.key)
|
||||
.context("failed to decrypt encryption key")
|
||||
.map_err(DownloadError::Other)?;
|
||||
Some(EncryptionKeyPair::new(
|
||||
wrapped_key.id,
|
||||
plain_key,
|
||||
wrapped_key.key,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
download::download_layer_file(
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
key_pair.as_ref(),
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
layer_file_name,
|
||||
@@ -1250,6 +1292,14 @@ impl RemoteTimelineClient {
|
||||
upload_queue: &mut UploadQueueInitialized,
|
||||
layer: ResidentLayer,
|
||||
) {
|
||||
let key_pair = {
|
||||
if let Some(key_id) = layer.metadata().encryption_key {
|
||||
let guard = self.key_repo.lock().unwrap();
|
||||
Some(guard.get(&key_id).cloned().unwrap())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
let metadata = layer.metadata();
|
||||
|
||||
upload_queue
|
||||
@@ -1264,7 +1314,7 @@ impl RemoteTimelineClient {
|
||||
"scheduled layer file upload {layer}",
|
||||
);
|
||||
|
||||
let op = UploadOp::UploadLayer(layer, metadata, None);
|
||||
let op = UploadOp::UploadLayer(layer, metadata, key_pair, None);
|
||||
self.metric_begin(&op);
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
}
|
||||
@@ -1446,6 +1496,58 @@ impl RemoteTimelineClient {
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn is_kms_enabled(&self) -> bool {
|
||||
self.kms_impl.is_some()
|
||||
}
|
||||
|
||||
pub(crate) fn schedule_generate_encryption_key(
|
||||
self: &Arc<Self>,
|
||||
) -> Result<Option<EncryptionKeyPair>, NotInitialized> {
|
||||
let Some(kms_impl) = self.kms_impl.as_ref() else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let plain_key = rand::random::<[u8; 32]>().to_vec(); // StdRng is cryptographically secure (?)
|
||||
let wrapped_key = kms_impl.encrypt(&plain_key).unwrap();
|
||||
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
let last_key = upload_queue.dirty.keys.last();
|
||||
let this_key_version = if let Some(last_key) = last_key {
|
||||
let key_version = EncryptionKeyId {
|
||||
version: last_key.id.version.next(),
|
||||
generation: self.generation,
|
||||
};
|
||||
assert!(key_version > last_key.id); // ensure key version is strictly increasing; no dup key versions
|
||||
key_version
|
||||
} else {
|
||||
EncryptionKeyId {
|
||||
version: KeyVersion(1),
|
||||
generation: self.generation,
|
||||
}
|
||||
};
|
||||
|
||||
let key_pair = EncryptionKeyPair {
|
||||
id: this_key_version.clone(),
|
||||
plain_key: plain_key.clone(),
|
||||
wrapped_key,
|
||||
};
|
||||
|
||||
upload_queue.dirty.keys.push(EncryptionKey {
|
||||
key: plain_key,
|
||||
id: this_key_version,
|
||||
created_at: Utc::now().naive_utc(),
|
||||
});
|
||||
|
||||
self.key_repo.lock().unwrap().insert(this_key_version, key_pair);
|
||||
|
||||
self.schedule_index_upload(upload_queue);
|
||||
|
||||
Ok(Some(key_pair))
|
||||
}
|
||||
|
||||
/// Schedules a compaction update to the remote `index_part.json`.
|
||||
///
|
||||
/// `compacted_from` represent the L0 names which have been `compacted_to` L1 layers.
|
||||
@@ -1454,6 +1556,7 @@ impl RemoteTimelineClient {
|
||||
compacted_from: &[Layer],
|
||||
compacted_to: &[ResidentLayer],
|
||||
) -> Result<(), NotInitialized> {
|
||||
// Use the same key for all layers in a single compaction job
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
@@ -1715,6 +1818,7 @@ impl RemoteTimelineClient {
|
||||
uploaded.local_path(),
|
||||
&remote_path,
|
||||
uploaded.metadata().file_size,
|
||||
None, // TODO(chi): support encryption for those layer files uploaded using this interface
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
@@ -1757,6 +1861,8 @@ impl RemoteTimelineClient {
|
||||
adopted_as.metadata().generation,
|
||||
);
|
||||
|
||||
// TODO: support encryption for those layer files uploaded using this interface
|
||||
|
||||
backoff::retry(
|
||||
|| async {
|
||||
upload::copy_timeline_layer(
|
||||
@@ -1977,7 +2083,7 @@ impl RemoteTimelineClient {
|
||||
|
||||
// Prepare upload.
|
||||
match &mut next_op {
|
||||
UploadOp::UploadLayer(layer, meta, mode) => {
|
||||
UploadOp::UploadLayer(layer, meta, _, mode) => {
|
||||
if upload_queue
|
||||
.recently_deleted
|
||||
.remove(&(layer.layer_desc().layer_name().clone(), meta.generation))
|
||||
@@ -2071,7 +2177,7 @@ impl RemoteTimelineClient {
|
||||
// Assert that we don't modify a layer that's referenced by the current index.
|
||||
if cfg!(debug_assertions) {
|
||||
let modified = match &task.op {
|
||||
UploadOp::UploadLayer(layer, layer_metadata, _) => {
|
||||
UploadOp::UploadLayer(layer, layer_metadata, _, _) => {
|
||||
vec![(layer.layer_desc().layer_name(), layer_metadata)]
|
||||
}
|
||||
UploadOp::Delete(delete) => {
|
||||
@@ -2093,7 +2199,7 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
|
||||
let upload_result: anyhow::Result<()> = match &task.op {
|
||||
UploadOp::UploadLayer(layer, layer_metadata, mode) => {
|
||||
UploadOp::UploadLayer(layer, layer_metadata, encryption_key_pair, mode) => {
|
||||
// TODO: check if this mechanism can be removed now that can_bypass() performs
|
||||
// conflict checks during scheduling.
|
||||
if let Some(OpType::FlushDeletion) = mode {
|
||||
@@ -2174,6 +2280,7 @@ impl RemoteTimelineClient {
|
||||
local_path,
|
||||
&remote_path,
|
||||
layer_metadata.file_size,
|
||||
encryption_key_pair.clone(),
|
||||
&self.cancel,
|
||||
)
|
||||
.measure_remote_op(
|
||||
@@ -2324,7 +2431,7 @@ impl RemoteTimelineClient {
|
||||
upload_queue.inprogress_tasks.remove(&task.task_id);
|
||||
|
||||
let lsn_update = match task.op {
|
||||
UploadOp::UploadLayer(_, _, _) => None,
|
||||
UploadOp::UploadLayer(_, _, _, _) => None,
|
||||
UploadOp::UploadMetadata { ref uploaded } => {
|
||||
// the task id is reused as a monotonicity check for storing the "clean"
|
||||
// IndexPart.
|
||||
@@ -2403,7 +2510,7 @@ impl RemoteTimelineClient {
|
||||
)> {
|
||||
use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
|
||||
let res = match op {
|
||||
UploadOp::UploadLayer(_, m, _) => (
|
||||
UploadOp::UploadLayer(_, m, _, _) => (
|
||||
RemoteOpFileKind::Layer,
|
||||
RemoteOpKind::Upload,
|
||||
RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size),
|
||||
@@ -2787,6 +2894,10 @@ mod tests {
|
||||
for entry in std::fs::read_dir(remote_path).unwrap().flatten() {
|
||||
let entry_name = entry.file_name();
|
||||
let fname = entry_name.to_str().unwrap();
|
||||
if fname.ends_with(".metadata") || fname.ends_with(".enc") {
|
||||
// ignore metadata and encryption key files; should use local_fs APIs instead in the future
|
||||
continue;
|
||||
}
|
||||
found.push(String::from(fname));
|
||||
}
|
||||
found.sort();
|
||||
@@ -2840,6 +2951,8 @@ mod tests {
|
||||
)),
|
||||
config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(&location_conf)),
|
||||
cancel: CancellationToken::new(),
|
||||
kms_impl: None,
|
||||
key_repo: std::sync::Mutex::new(HashMap::new()),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::{backoff, pausable_failpoint};
|
||||
|
||||
use super::index::{IndexPart, LayerFileMetadata};
|
||||
use super::index::{EncryptionKeyPair, IndexPart, LayerFileMetadata};
|
||||
use super::manifest::TenantManifest;
|
||||
use super::{
|
||||
FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, INITDB_PATH, parse_remote_index_path,
|
||||
@@ -51,6 +51,7 @@ use crate::virtual_file::{MaybeFatalIo, VirtualFile, on_fatal_io_error};
|
||||
pub async fn download_layer_file<'a>(
|
||||
conf: &'static PageServerConf,
|
||||
storage: &'a GenericRemoteStorage,
|
||||
key_pair: Option<&'a EncryptionKeyPair>,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
layer_file_name: &'a LayerName,
|
||||
@@ -86,7 +87,16 @@ pub async fn download_layer_file<'a>(
|
||||
|
||||
let bytes_amount = download_retry(
|
||||
|| async {
|
||||
download_object(storage, &remote_path, &temp_file_path, gate, cancel, ctx).await
|
||||
download_object(
|
||||
storage,
|
||||
key_pair,
|
||||
&remote_path,
|
||||
&temp_file_path,
|
||||
gate,
|
||||
cancel,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
},
|
||||
&format!("download {remote_path:?}"),
|
||||
cancel,
|
||||
@@ -145,6 +155,7 @@ pub async fn download_layer_file<'a>(
|
||||
/// The unlinking has _not_ been made durable.
|
||||
async fn download_object(
|
||||
storage: &GenericRemoteStorage,
|
||||
encryption_key_pair: Option<&EncryptionKeyPair>,
|
||||
src_path: &RemotePath,
|
||||
dst_path: &Utf8PathBuf,
|
||||
#[cfg_attr(target_os = "macos", allow(unused_variables))] gate: &utils::sync::gate::Gate,
|
||||
@@ -160,9 +171,12 @@ async fn download_object(
|
||||
.with_context(|| format!("create a destination file for layer '{dst_path}'"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
let download = storage
|
||||
.download(src_path, &DownloadOpts::default(), cancel)
|
||||
.await?;
|
||||
let mut opts = DownloadOpts::default();
|
||||
if let Some(encryption_key_pair) = encryption_key_pair {
|
||||
opts.encryption_key = Some(encryption_key_pair.plain_key.to_vec());
|
||||
}
|
||||
|
||||
let download = storage.download(src_path, &opts, cancel).await?;
|
||||
|
||||
pausable_failpoint!("before-downloading-layer-stream-pausable");
|
||||
|
||||
|
||||
@@ -10,6 +10,8 @@ use pageserver_api::models::AuxFilePolicy;
|
||||
use pageserver_api::models::RelSizeMigration;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::base64::Base64;
|
||||
use serde_with::serde_as;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -114,6 +116,70 @@ pub struct IndexPart {
|
||||
/// The timestamp when the timeline was marked invisible in synthetic size calculations.
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub(crate) marked_invisible_at: Option<NaiveDateTime>,
|
||||
|
||||
/// The encryption key used to encrypt the timeline layer files.
|
||||
#[serde(skip_serializing_if = "Vec::is_empty", default)]
|
||||
pub(crate) keys: Vec<EncryptionKey>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Ord, PartialOrd, Hash)]
|
||||
pub struct KeyVersion(pub u32);
|
||||
|
||||
impl KeyVersion {
|
||||
pub fn next(&self) -> Self {
|
||||
Self(self.0 + 1)
|
||||
}
|
||||
}
|
||||
|
||||
/// An identifier for an encryption key. The scope of the key is the timeline (TBD).
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Ord, PartialOrd, Hash)]
|
||||
pub struct EncryptionKeyId {
|
||||
pub version: KeyVersion,
|
||||
pub generation: Generation,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct EncryptionKeyPair {
|
||||
pub id: EncryptionKeyId,
|
||||
pub plain_key: Vec<u8>,
|
||||
pub wrapped_key: Vec<u8>,
|
||||
}
|
||||
|
||||
impl EncryptionKeyPair {
|
||||
pub fn new(id: EncryptionKeyId, plain_key: Vec<u8>, wrapped_key: Vec<u8>) -> Self {
|
||||
Self {
|
||||
id,
|
||||
plain_key,
|
||||
wrapped_key,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for EncryptionKeyPair {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let display =
|
||||
base64::display::Base64Display::with_config(&self.wrapped_key, base64::STANDARD);
|
||||
struct DisplayAsDebug<T: std::fmt::Display>(T);
|
||||
impl<T: std::fmt::Display> std::fmt::Debug for DisplayAsDebug<T> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
f.debug_struct("EncryptionKeyPair")
|
||||
.field("id", &self.id)
|
||||
.field("plain_key", &"<REDACTED>")
|
||||
.field("wrapped_key", &DisplayAsDebug(&display))
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
pub struct EncryptionKey {
|
||||
#[serde_as(as = "Base64")]
|
||||
pub key: Vec<u8>,
|
||||
pub id: EncryptionKeyId,
|
||||
pub created_at: NaiveDateTime,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
@@ -142,10 +208,12 @@ impl IndexPart {
|
||||
/// - 12: +l2_lsn
|
||||
/// - 13: +gc_compaction
|
||||
/// - 14: +marked_invisible_at
|
||||
const LATEST_VERSION: usize = 14;
|
||||
/// - 15: +keys and encryption_key in layer_metadata
|
||||
const LATEST_VERSION: usize = 15;
|
||||
|
||||
// Versions we may see when reading from a bucket.
|
||||
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14];
|
||||
pub const KNOWN_VERSIONS: &'static [usize] =
|
||||
&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
|
||||
|
||||
pub const FILE_NAME: &'static str = "index_part.json";
|
||||
|
||||
@@ -165,6 +233,7 @@ impl IndexPart {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -205,14 +274,16 @@ impl IndexPart {
|
||||
/// Check for invariants in the index: this is useful when uploading an index to ensure that if
|
||||
/// we encounter a bug, we do not persist buggy metadata.
|
||||
pub(crate) fn validate(&self) -> Result<(), String> {
|
||||
if self.import_pgdata.is_none()
|
||||
&& self.metadata.ancestor_timeline().is_none()
|
||||
&& self.layer_metadata.is_empty()
|
||||
{
|
||||
// Unless we're in the middle of a raw pgdata import, or this is a child timeline,the index must
|
||||
// always have at least one layer.
|
||||
return Err("Index has no ancestor and no layers".to_string());
|
||||
}
|
||||
// We have to disable this check: we might need to upload an empty index part with new keys, or new `reldirv2` flag.
|
||||
|
||||
// if self.import_pgdata.is_none()
|
||||
// && self.metadata.ancestor_timeline().is_none()
|
||||
// && self.layer_metadata.is_empty()
|
||||
// {
|
||||
// // Unless we're in the middle of a raw pgdata import, or this is a child timeline,the index must
|
||||
// // always have at least one layer.
|
||||
// return Err("Index has no ancestor and no layers".to_string());
|
||||
// }
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -222,7 +293,7 @@ impl IndexPart {
|
||||
///
|
||||
/// Fields have to be `Option`s because remote [`IndexPart`]'s can be from different version, which
|
||||
/// might have less or more metadata depending if upgrading or rolling back an upgrade.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct LayerFileMetadata {
|
||||
pub file_size: u64,
|
||||
|
||||
@@ -233,6 +304,9 @@ pub struct LayerFileMetadata {
|
||||
#[serde(default = "ShardIndex::unsharded")]
|
||||
#[serde(skip_serializing_if = "ShardIndex::is_unsharded")]
|
||||
pub shard: ShardIndex,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub encryption_key: Option<EncryptionKeyId>,
|
||||
}
|
||||
|
||||
impl LayerFileMetadata {
|
||||
@@ -241,6 +315,7 @@ impl LayerFileMetadata {
|
||||
file_size,
|
||||
generation,
|
||||
shard,
|
||||
encryption_key: None,
|
||||
}
|
||||
}
|
||||
/// Helper to get both generation and file size in a tuple
|
||||
@@ -453,14 +528,16 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -475,6 +552,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -502,14 +580,16 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -524,6 +604,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -552,14 +633,16 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -574,6 +657,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -627,6 +711,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let empty_layers_parsed = IndexPart::from_json_bytes(empty_layers_json.as_bytes()).unwrap();
|
||||
@@ -653,14 +738,16 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -675,6 +762,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -703,11 +791,13 @@ mod tests {
|
||||
file_size: 23289856,
|
||||
generation: Generation::new(1),
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014EF499-00000000015A7619".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 1015808,
|
||||
generation: Generation::new(1),
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: Lsn::from_str("0/15A7618").unwrap(),
|
||||
@@ -726,6 +816,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -756,14 +847,16 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -782,6 +875,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -815,12 +909,14 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -843,6 +939,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -877,12 +974,14 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -905,6 +1004,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -941,12 +1041,14 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -972,6 +1074,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -1017,12 +1120,14 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -1052,6 +1157,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -1098,12 +1204,14 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -1133,6 +1241,7 @@ mod tests {
|
||||
l2_lsn: None,
|
||||
gc_compaction: None,
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -1183,12 +1292,14 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -1220,6 +1331,7 @@ mod tests {
|
||||
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
}),
|
||||
marked_invisible_at: None,
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
@@ -1271,12 +1383,14 @@ mod tests {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: None,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -1308,6 +1422,139 @@ mod tests {
|
||||
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
}),
|
||||
marked_invisible_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
keys: Vec::new(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
assert_eq!(part, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn v15_keys_are_parsed() {
|
||||
let example = r#"{
|
||||
"version": 15,
|
||||
"layer_metadata":{
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000, "encryption_key": { "version": 1, "generation": 5 } },
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001, "encryption_key": { "version": 2, "generation": 6 } }
|
||||
},
|
||||
"disk_consistent_lsn":"0/16960E8",
|
||||
"metadata": {
|
||||
"disk_consistent_lsn": "0/16960E8",
|
||||
"prev_record_lsn": "0/1696070",
|
||||
"ancestor_timeline": "e45a7f37d3ee2ff17dc14bf4f4e3f52e",
|
||||
"ancestor_lsn": "0/0",
|
||||
"latest_gc_cutoff_lsn": "0/1696070",
|
||||
"initdb_lsn": "0/1696070",
|
||||
"pg_version": 14
|
||||
},
|
||||
"gc_blocking": {
|
||||
"started_at": "2024-07-19T09:00:00.123",
|
||||
"reasons": ["DetachAncestor"]
|
||||
},
|
||||
"import_pgdata": {
|
||||
"V1": {
|
||||
"Done": {
|
||||
"idempotency_key": "specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5",
|
||||
"started_at": "2024-11-13T09:23:42.123",
|
||||
"finished_at": "2024-11-13T09:42:23.123"
|
||||
}
|
||||
}
|
||||
},
|
||||
"rel_size_migration": "legacy",
|
||||
"l2_lsn": "0/16960E8",
|
||||
"gc_compaction": {
|
||||
"last_completed_lsn": "0/16960E8"
|
||||
},
|
||||
"marked_invisible_at": "2023-07-31T09:00:00.123",
|
||||
"keys": [
|
||||
{
|
||||
"key": "dGVzdF9rZXk=",
|
||||
"id": {
|
||||
"version": 1,
|
||||
"generation": 5
|
||||
},
|
||||
"created_at": "2024-07-19T09:00:00.123"
|
||||
},
|
||||
{
|
||||
"key": "dGVzdF9rZXlfMg==",
|
||||
"id": {
|
||||
"version": 2,
|
||||
"generation": 6
|
||||
},
|
||||
"created_at": "2024-07-19T10:00:00.123"
|
||||
}
|
||||
]
|
||||
}"#;
|
||||
|
||||
let expected = IndexPart {
|
||||
version: 15,
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: Some(EncryptionKeyId {
|
||||
version: KeyVersion(1),
|
||||
generation: Generation::Valid(5),
|
||||
}),
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded(),
|
||||
encryption_key: Some(EncryptionKeyId {
|
||||
version: KeyVersion(2),
|
||||
generation: Generation::Valid(6),
|
||||
}),
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata: TimelineMetadata::new(
|
||||
Lsn::from_str("0/16960E8").unwrap(),
|
||||
Some(Lsn::from_str("0/1696070").unwrap()),
|
||||
Some(TimelineId::from_str("e45a7f37d3ee2ff17dc14bf4f4e3f52e").unwrap()),
|
||||
Lsn::INVALID,
|
||||
Lsn::from_str("0/1696070").unwrap(),
|
||||
Lsn::from_str("0/1696070").unwrap(),
|
||||
14,
|
||||
).with_recalculated_checksum().unwrap(),
|
||||
deleted_at: None,
|
||||
lineage: Default::default(),
|
||||
gc_blocking: Some(GcBlocking {
|
||||
started_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"),
|
||||
reasons: enumset::EnumSet::from_iter([GcBlockingReason::DetachAncestor]),
|
||||
}),
|
||||
last_aux_file_policy: Default::default(),
|
||||
archived_at: None,
|
||||
import_pgdata: Some(import_pgdata::index_part_format::Root::V1(import_pgdata::index_part_format::V1::Done(import_pgdata::index_part_format::Done{
|
||||
started_at: parse_naive_datetime("2024-11-13T09:23:42.123000000"),
|
||||
finished_at: parse_naive_datetime("2024-11-13T09:42:23.123000000"),
|
||||
idempotency_key: import_pgdata::index_part_format::IdempotencyKey::new("specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5".to_string()),
|
||||
}))),
|
||||
rel_size_migration: Some(RelSizeMigration::Legacy),
|
||||
l2_lsn: Some("0/16960E8".parse::<Lsn>().unwrap()),
|
||||
gc_compaction: Some(GcCompactionState {
|
||||
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
}),
|
||||
marked_invisible_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
keys: vec![
|
||||
EncryptionKey {
|
||||
key: "test_key".as_bytes().to_vec(),
|
||||
id: EncryptionKeyId {
|
||||
version: KeyVersion(1),
|
||||
generation: Generation::Valid(5),
|
||||
},
|
||||
created_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"),
|
||||
},
|
||||
EncryptionKey {
|
||||
key: "test_key_2".as_bytes().to_vec(),
|
||||
id: EncryptionKeyId {
|
||||
version: KeyVersion(2),
|
||||
generation: Generation::Valid(6),
|
||||
},
|
||||
created_at: parse_naive_datetime("2024-07-19T10:00:00.123000000"),
|
||||
}
|
||||
],
|
||||
};
|
||||
|
||||
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
|
||||
|
||||
@@ -17,7 +17,7 @@ use utils::id::{TenantId, TimelineId};
|
||||
use utils::{backoff, pausable_failpoint};
|
||||
|
||||
use super::Generation;
|
||||
use super::index::IndexPart;
|
||||
use super::index::{EncryptionKeyPair, IndexPart};
|
||||
use super::manifest::TenantManifest;
|
||||
use crate::tenant::remote_timeline_client::{
|
||||
remote_index_path, remote_initdb_archive_path, remote_initdb_preserved_archive_path,
|
||||
@@ -101,6 +101,7 @@ pub(super) async fn upload_timeline_layer<'a>(
|
||||
local_path: &'a Utf8Path,
|
||||
remote_path: &'a RemotePath,
|
||||
metadata_size: u64,
|
||||
encryption_key_pair: Option<EncryptionKeyPair>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
fail_point!("before-upload-layer", |_| {
|
||||
@@ -144,7 +145,14 @@ pub(super) async fn upload_timeline_layer<'a>(
|
||||
let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);
|
||||
|
||||
storage
|
||||
.upload(reader, fs_size, remote_path, None, cancel)
|
||||
.upload_with_encryption(
|
||||
reader,
|
||||
fs_size,
|
||||
remote_path,
|
||||
None,
|
||||
encryption_key_pair.as_ref().map(|k| k.plain_key.as_slice()),
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("upload layer from local path '{local_path}'"))
|
||||
}
|
||||
|
||||
@@ -1310,6 +1310,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
let downloaded_bytes = download_layer_file(
|
||||
self.conf,
|
||||
self.remote_storage,
|
||||
None, // TODO: add encryption key pair
|
||||
*tenant_shard_id,
|
||||
*timeline_id,
|
||||
&layer.name,
|
||||
|
||||
@@ -4864,6 +4864,7 @@ impl Timeline {
|
||||
else {
|
||||
panic!("delta layer cannot be empty if no filter is applied");
|
||||
};
|
||||
|
||||
(
|
||||
// FIXME: even though we have a single image and single delta layer assumption
|
||||
// we push them to vec
|
||||
@@ -6932,9 +6933,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Update remote_timeline_client state to reflect existence of this layer
|
||||
self.remote_client
|
||||
.schedule_layer_file_upload(image_layer)
|
||||
.unwrap();
|
||||
self.remote_client.schedule_layer_file_upload(image_layer)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -6995,9 +6994,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Update remote_timeline_client state to reflect existence of this layer
|
||||
self.remote_client
|
||||
.schedule_layer_file_upload(delta_layer)
|
||||
.unwrap();
|
||||
self.remote_client.schedule_layer_file_upload(delta_layer)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
|
||||
use std::ops::{Deref, Range};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use super::layer_manager::LayerManager;
|
||||
use super::{
|
||||
@@ -119,32 +119,25 @@ pub struct GcCompactionMetaStatistics {
|
||||
/// The layer size after compaction.
|
||||
pub after_compaction_layer_size: u64,
|
||||
/// The start time of the meta job.
|
||||
pub start_time: Option<chrono::DateTime<chrono::Utc>>,
|
||||
pub start_time: Option<SystemTime>,
|
||||
/// The end time of the meta job.
|
||||
pub end_time: Option<chrono::DateTime<chrono::Utc>>,
|
||||
pub end_time: Option<SystemTime>,
|
||||
/// The duration of the meta job.
|
||||
pub duration_secs: f64,
|
||||
/// The id of the meta job.
|
||||
pub meta_job_id: GcCompactionJobId,
|
||||
/// The LSN below which the layers are compacted, used to compute the statistics.
|
||||
pub below_lsn: Lsn,
|
||||
/// The retention ratio of the meta job (after_compaction_layer_size / before_compaction_layer_size)
|
||||
pub retention_ratio: f64,
|
||||
}
|
||||
|
||||
impl GcCompactionMetaStatistics {
|
||||
fn finalize(&mut self) {
|
||||
let end_time = chrono::Utc::now();
|
||||
let end_time = SystemTime::now();
|
||||
if let Some(start_time) = self.start_time {
|
||||
if end_time > start_time {
|
||||
let delta = end_time - start_time;
|
||||
if let Ok(std_dur) = delta.to_std() {
|
||||
self.duration_secs = std_dur.as_secs_f64();
|
||||
}
|
||||
if let Ok(duration) = end_time.duration_since(start_time) {
|
||||
self.duration_secs = duration.as_secs_f64();
|
||||
}
|
||||
}
|
||||
self.retention_ratio = self.after_compaction_layer_size as f64
|
||||
/ (self.before_compaction_layer_size as f64 + 1.0);
|
||||
self.end_time = Some(end_time);
|
||||
}
|
||||
}
|
||||
@@ -527,7 +520,7 @@ impl GcCompactionQueue {
|
||||
}
|
||||
guard.meta_statistics = Some(GcCompactionMetaStatistics {
|
||||
meta_job_id: id,
|
||||
start_time: Some(chrono::Utc::now()),
|
||||
start_time: Some(SystemTime::now()),
|
||||
before_compaction_layer_size: layer_size,
|
||||
below_lsn: expected_l2_lsn,
|
||||
total_sub_compaction_jobs: jobs_len,
|
||||
@@ -1298,6 +1291,7 @@ impl Timeline {
|
||||
.parts
|
||||
.extend(sparse_partitioning.into_dense().parts);
|
||||
|
||||
|
||||
// 3. Create new image layers for partitions that have been modified "enough".
|
||||
let (image_layers, outcome) = self
|
||||
.create_image_layers(
|
||||
|
||||
@@ -244,7 +244,8 @@ impl RemoteStorageWrapper {
|
||||
kind: DownloadKind::Large,
|
||||
etag: None,
|
||||
byte_start: Bound::Included(start_inclusive),
|
||||
byte_end: Bound::Excluded(end_exclusive)
|
||||
byte_end: Bound::Excluded(end_exclusive),
|
||||
encryption_key: None,
|
||||
},
|
||||
&self.cancel)
|
||||
.await?;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//! FIXME: most of this is copy-paste from mgmt_api.rs ; dedupe into a `reqwest_utils::Client` crate.
|
||||
use pageserver_client::mgmt_api::{Error, ResponseErrorMessageExt};
|
||||
use reqwest::{Certificate, Method};
|
||||
use reqwest::Method;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::error;
|
||||
@@ -34,7 +34,7 @@ impl Client {
|
||||
};
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
for cert in &conf.ssl_ca_certs {
|
||||
http_client = http_client.add_root_certificate(Certificate::from_der(cert.contents())?);
|
||||
http_client = http_client.add_root_certificate(cert.clone());
|
||||
}
|
||||
let http_client = http_client.build()?;
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ use tracing::info;
|
||||
use utils::generation::Generation;
|
||||
use utils::lsn::{AtomicLsn, Lsn};
|
||||
|
||||
use super::remote_timeline_client::index::EncryptionKeyPair;
|
||||
use super::remote_timeline_client::is_same_remote_layer_path;
|
||||
use super::storage_layer::{AsLayerDesc as _, LayerName, ResidentLayer};
|
||||
use crate::tenant::metadata::TimelineMetadata;
|
||||
@@ -245,7 +246,7 @@ impl UploadQueueInitialized {
|
||||
pub(crate) fn num_inprogress_layer_uploads(&self) -> usize {
|
||||
self.inprogress_tasks
|
||||
.iter()
|
||||
.filter(|(_, t)| matches!(t.op, UploadOp::UploadLayer(_, _, _)))
|
||||
.filter(|(_, t)| matches!(t.op, UploadOp::UploadLayer(_, _, _, _)))
|
||||
.count()
|
||||
}
|
||||
|
||||
@@ -461,7 +462,12 @@ pub struct Delete {
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum UploadOp {
|
||||
/// Upload a layer file. The last field indicates the last operation for thie file.
|
||||
UploadLayer(ResidentLayer, LayerFileMetadata, Option<OpType>),
|
||||
UploadLayer(
|
||||
ResidentLayer,
|
||||
LayerFileMetadata,
|
||||
Option<EncryptionKeyPair>,
|
||||
Option<OpType>,
|
||||
),
|
||||
|
||||
/// Upload a index_part.json file
|
||||
UploadMetadata {
|
||||
@@ -483,7 +489,7 @@ pub enum UploadOp {
|
||||
impl std::fmt::Display for UploadOp {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match self {
|
||||
UploadOp::UploadLayer(layer, metadata, mode) => {
|
||||
UploadOp::UploadLayer(layer, metadata, _, mode) => {
|
||||
write!(
|
||||
f,
|
||||
"UploadLayer({}, size={:?}, gen={:?}, mode={:?})",
|
||||
@@ -517,13 +523,13 @@ impl UploadOp {
|
||||
(UploadOp::Shutdown, _) | (_, UploadOp::Shutdown) => false,
|
||||
|
||||
// Uploads and deletes can bypass each other unless they're for the same file.
|
||||
(UploadOp::UploadLayer(a, ameta, _), UploadOp::UploadLayer(b, bmeta, _)) => {
|
||||
(UploadOp::UploadLayer(a, ameta, _, _), UploadOp::UploadLayer(b, bmeta, _, _)) => {
|
||||
let aname = &a.layer_desc().layer_name();
|
||||
let bname = &b.layer_desc().layer_name();
|
||||
!is_same_remote_layer_path(aname, ameta, bname, bmeta)
|
||||
}
|
||||
(UploadOp::UploadLayer(u, umeta, _), UploadOp::Delete(d))
|
||||
| (UploadOp::Delete(d), UploadOp::UploadLayer(u, umeta, _)) => {
|
||||
(UploadOp::UploadLayer(u, umeta, _, _), UploadOp::Delete(d))
|
||||
| (UploadOp::Delete(d), UploadOp::UploadLayer(u, umeta, _, _)) => {
|
||||
d.layers.iter().all(|(dname, dmeta)| {
|
||||
!is_same_remote_layer_path(&u.layer_desc().layer_name(), umeta, dname, dmeta)
|
||||
})
|
||||
@@ -539,8 +545,8 @@ impl UploadOp {
|
||||
// Similarly, index uploads can bypass uploads and deletes as long as neither the
|
||||
// uploaded index nor the active index references the file (the latter would be
|
||||
// incorrect use by the caller).
|
||||
(UploadOp::UploadLayer(u, umeta, _), UploadOp::UploadMetadata { uploaded: i })
|
||||
| (UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, umeta, _)) => {
|
||||
(UploadOp::UploadLayer(u, umeta, _, _), UploadOp::UploadMetadata { uploaded: i })
|
||||
| (UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, umeta, _, _)) => {
|
||||
let uname = u.layer_desc().layer_name();
|
||||
!i.references(&uname, umeta) && !index.references(&uname, umeta)
|
||||
}
|
||||
@@ -577,7 +583,7 @@ mod tests {
|
||||
fn assert_same_op(a: &UploadOp, b: &UploadOp) {
|
||||
use UploadOp::*;
|
||||
match (a, b) {
|
||||
(UploadLayer(a, ameta, atype), UploadLayer(b, bmeta, btype)) => {
|
||||
(UploadLayer(a, ameta, _, atype), UploadLayer(b, bmeta, _, btype)) => {
|
||||
assert_eq!(a.layer_desc().layer_name(), b.layer_desc().layer_name());
|
||||
assert_eq!(ameta, bmeta);
|
||||
assert_eq!(atype, btype);
|
||||
@@ -641,6 +647,7 @@ mod tests {
|
||||
generation: timeline.generation,
|
||||
shard: timeline.get_shard_index(),
|
||||
file_size: size as u64,
|
||||
encryption_key: None,
|
||||
};
|
||||
make_layer_with_metadata(timeline, name, metadata)
|
||||
}
|
||||
@@ -710,7 +717,7 @@ mod tests {
|
||||
|
||||
// Enqueue non-conflicting upload, delete, and index before and after a barrier.
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
|
||||
}),
|
||||
@@ -718,7 +725,7 @@ mod tests {
|
||||
uploaded: index.clone(),
|
||||
},
|
||||
UploadOp::Barrier(barrier),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
|
||||
}),
|
||||
@@ -843,9 +850,9 @@ mod tests {
|
||||
);
|
||||
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0a.clone(), layer0a.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0b.clone(), layer0b.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0c.clone(), layer0c.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0a.clone(), layer0a.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer0b.clone(), layer0b.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer0c.clone(), layer0c.metadata(), None, None),
|
||||
];
|
||||
|
||||
queue.queued_operations.extend(ops.clone());
|
||||
@@ -882,14 +889,14 @@ mod tests {
|
||||
);
|
||||
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![
|
||||
(layer0.layer_desc().layer_name(), layer0.metadata()),
|
||||
(layer1.layer_desc().layer_name(), layer1.metadata()),
|
||||
],
|
||||
}),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
|
||||
];
|
||||
|
||||
queue.queued_operations.extend(ops.clone());
|
||||
@@ -938,15 +945,15 @@ mod tests {
|
||||
);
|
||||
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![
|
||||
(layer0.layer_desc().layer_name(), layer0.metadata()),
|
||||
(layer1.layer_desc().layer_name(), layer1.metadata()),
|
||||
],
|
||||
}),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
|
||||
}),
|
||||
@@ -984,9 +991,9 @@ mod tests {
|
||||
);
|
||||
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
|
||||
];
|
||||
|
||||
queue.queued_operations.extend(ops.clone());
|
||||
@@ -1061,15 +1068,15 @@ mod tests {
|
||||
let index2 = index_with(&index1, &layer2);
|
||||
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
|
||||
UploadOp::UploadMetadata {
|
||||
uploaded: index0.clone(),
|
||||
},
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
|
||||
UploadOp::UploadMetadata {
|
||||
uploaded: index1.clone(),
|
||||
},
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
|
||||
UploadOp::UploadMetadata {
|
||||
uploaded: index2.clone(),
|
||||
},
|
||||
@@ -1128,7 +1135,7 @@ mod tests {
|
||||
|
||||
let ops = [
|
||||
// Initial upload, with a barrier to prevent index coalescing.
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None, None),
|
||||
UploadOp::UploadMetadata {
|
||||
uploaded: index_upload.clone(),
|
||||
},
|
||||
@@ -1177,7 +1184,7 @@ mod tests {
|
||||
|
||||
let ops = [
|
||||
// Initial upload, with a barrier to prevent index coalescing.
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None, None),
|
||||
UploadOp::UploadMetadata {
|
||||
uploaded: index_upload.clone(),
|
||||
},
|
||||
@@ -1187,7 +1194,7 @@ mod tests {
|
||||
uploaded: index_deref.clone(),
|
||||
},
|
||||
// Replace and reference the layer.
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None, None),
|
||||
UploadOp::UploadMetadata {
|
||||
uploaded: index_ref.clone(),
|
||||
},
|
||||
@@ -1235,7 +1242,7 @@ mod tests {
|
||||
|
||||
// Enqueue non-conflicting upload, delete, and index before and after a shutdown.
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
|
||||
}),
|
||||
@@ -1243,7 +1250,7 @@ mod tests {
|
||||
uploaded: index.clone(),
|
||||
},
|
||||
UploadOp::Shutdown,
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
|
||||
}),
|
||||
@@ -1305,10 +1312,10 @@ mod tests {
|
||||
);
|
||||
|
||||
let ops = [
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
|
||||
UploadOp::UploadLayer(layer3.clone(), layer3.metadata(), None),
|
||||
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None, None),
|
||||
UploadOp::UploadLayer(layer3.clone(), layer3.metadata(), None, None),
|
||||
];
|
||||
|
||||
queue.queued_operations.extend(ops.clone());
|
||||
@@ -1359,7 +1366,7 @@ mod tests {
|
||||
.layer_metadata
|
||||
.insert(layer.layer_desc().layer_name(), layer.metadata());
|
||||
vec![
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
|
||||
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None, None),
|
||||
UploadOp::Delete(Delete {
|
||||
layers: vec![(layer.layer_desc().layer_name(), layer.metadata())],
|
||||
}),
|
||||
@@ -1378,6 +1385,7 @@ mod tests {
|
||||
shard,
|
||||
generation: Generation::Valid(generation),
|
||||
file_size: 0,
|
||||
encryption_key: None,
|
||||
};
|
||||
make_layer_with_metadata(&tli, name, metadata)
|
||||
};
|
||||
|
||||
@@ -24,7 +24,6 @@ use crate::config::{
|
||||
use crate::context::parquet::ParquetUploadArgs;
|
||||
use crate::http::health_server::AppMetrics;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::proxy::conntrack::ConnectionTracking;
|
||||
use crate::rate_limiter::{
|
||||
EndpointRateLimiter, LeakyBucketConfig, RateBucketInfo, WakeComputeRateLimiter,
|
||||
};
|
||||
@@ -419,8 +418,6 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
64,
|
||||
));
|
||||
|
||||
let conntracking = Arc::new(ConnectionTracking::default());
|
||||
|
||||
// client facing tasks. these will exit on error or on cancellation
|
||||
// cancellation returns Ok(())
|
||||
let mut client_tasks = JoinSet::new();
|
||||
@@ -434,7 +431,6 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
cancellation_token.clone(),
|
||||
cancellation_handler.clone(),
|
||||
endpoint_rate_limiter.clone(),
|
||||
conntracking.clone(),
|
||||
));
|
||||
}
|
||||
|
||||
@@ -457,7 +453,6 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
proxy_listener,
|
||||
cancellation_token.clone(),
|
||||
cancellation_handler.clone(),
|
||||
conntracking.clone(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,6 @@ use crate::error::ReportableError;
|
||||
use crate::metrics::{Metrics, NumClientConnectionsGuard};
|
||||
use crate::protocol2::{ConnectHeader, ConnectionInfo, read_proxy_protocol};
|
||||
use crate::proxy::connect_compute::{TcpMechanism, connect_to_compute};
|
||||
use crate::proxy::conntrack::ConnectionTracking;
|
||||
use crate::proxy::handshake::{HandshakeData, handshake};
|
||||
use crate::proxy::passthrough::ProxyPassthrough;
|
||||
use crate::proxy::{
|
||||
@@ -26,7 +25,6 @@ pub async fn task_main(
|
||||
listener: tokio::net::TcpListener,
|
||||
cancellation_token: CancellationToken,
|
||||
cancellation_handler: Arc<CancellationHandler>,
|
||||
conntracking: Arc<ConnectionTracking>,
|
||||
) -> anyhow::Result<()> {
|
||||
scopeguard::defer! {
|
||||
info!("proxy has shut down");
|
||||
@@ -52,7 +50,6 @@ pub async fn task_main(
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
let cancellation_handler = Arc::clone(&cancellation_handler);
|
||||
let cancellations = cancellations.clone();
|
||||
let conntracking = Arc::clone(&conntracking);
|
||||
|
||||
debug!(protocol = "tcp", %session_id, "accepted new TCP connection");
|
||||
|
||||
@@ -114,7 +111,6 @@ pub async fn task_main(
|
||||
socket,
|
||||
conn_gauge,
|
||||
cancellations,
|
||||
conntracking,
|
||||
)
|
||||
.instrument(ctx.span())
|
||||
.boxed()
|
||||
@@ -171,7 +167,6 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
stream: S,
|
||||
conn_gauge: NumClientConnectionsGuard<'static>,
|
||||
cancellations: tokio_util::task::task_tracker::TaskTracker,
|
||||
conntracking: Arc<ConnectionTracking>,
|
||||
) -> Result<Option<ProxyPassthrough<S>>, ClientRequestError> {
|
||||
debug!(
|
||||
protocol = %ctx.protocol(),
|
||||
@@ -269,7 +264,6 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
compute: node,
|
||||
session_id: ctx.session_id(),
|
||||
cancel: session,
|
||||
conntracking,
|
||||
_req: request_gauge,
|
||||
_conn: conn_gauge,
|
||||
}))
|
||||
|
||||
@@ -1,680 +0,0 @@
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering};
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::SystemTime;
|
||||
use std::{fmt, io};
|
||||
|
||||
use pin_project_lite::pin_project;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
|
||||
pub struct ConnId(usize);
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ConnectionTracking {
|
||||
conns: clashmap::ClashMap<ConnId, (ConnectionState, SystemTime)>,
|
||||
}
|
||||
|
||||
impl ConnectionTracking {
|
||||
pub fn new_tracker(self: &Arc<Self>) -> ConnectionTracker<Arc<Self>> {
|
||||
let conn_id = self.new_conn_id();
|
||||
ConnectionTracker::new(conn_id, Arc::clone(self))
|
||||
}
|
||||
|
||||
fn new_conn_id(&self) -> ConnId {
|
||||
static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
|
||||
let id = ConnId(NEXT_ID.fetch_add(1, Ordering::Relaxed));
|
||||
self.conns
|
||||
.insert(id, (ConnectionState::Idle, SystemTime::now()));
|
||||
id
|
||||
}
|
||||
|
||||
fn update(&self, conn_id: ConnId, new_state: ConnectionState) {
|
||||
let new_timestamp = SystemTime::now();
|
||||
let old_state = self.conns.insert(conn_id, (new_state, new_timestamp));
|
||||
|
||||
if let Some((old_state, _old_timestamp)) = old_state {
|
||||
tracing::debug!(?conn_id, %old_state, %new_state, "conntrack: update");
|
||||
} else {
|
||||
tracing::debug!(?conn_id, %new_state, "conntrack: update");
|
||||
}
|
||||
}
|
||||
|
||||
fn remove(&self, conn_id: ConnId) {
|
||||
if let Some((_, (old_state, _old_timestamp))) = self.conns.remove(&conn_id) {
|
||||
tracing::debug!(?conn_id, %old_state, "conntrack: remove");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl StateChangeObserver for Arc<ConnectionTracking> {
|
||||
type ConnId = ConnId;
|
||||
fn change(
|
||||
&self,
|
||||
conn_id: Self::ConnId,
|
||||
_old_state: ConnectionState,
|
||||
new_state: ConnectionState,
|
||||
) {
|
||||
match new_state {
|
||||
ConnectionState::Init
|
||||
| ConnectionState::Idle
|
||||
| ConnectionState::Transaction
|
||||
| ConnectionState::Busy
|
||||
| ConnectionState::Unknown => self.update(conn_id, new_state),
|
||||
ConnectionState::Closed => self.remove(conn_id),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Called by `ConnectionTracker` whenever the `ConnectionState` changed.
|
||||
pub trait StateChangeObserver {
|
||||
/// Identifier of the connection passed back on state change.
|
||||
type ConnId: Copy;
|
||||
/// Called iff the connection's state changed.
|
||||
fn change(&self, conn_id: Self::ConnId, old_state: ConnectionState, new_state: ConnectionState);
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
|
||||
#[repr(u8)]
|
||||
pub enum ConnectionState {
|
||||
#[default]
|
||||
Init = 0,
|
||||
Idle = 1,
|
||||
Transaction = 2,
|
||||
Busy = 3,
|
||||
Closed = 4,
|
||||
Unknown = 5,
|
||||
}
|
||||
|
||||
impl ConnectionState {
|
||||
const fn into_repr(self) -> u8 {
|
||||
self as u8
|
||||
}
|
||||
|
||||
const fn from_repr(value: u8) -> Option<Self> {
|
||||
Some(match value {
|
||||
0 => Self::Init,
|
||||
1 => Self::Idle,
|
||||
2 => Self::Transaction,
|
||||
3 => Self::Busy,
|
||||
4 => Self::Closed,
|
||||
5 => Self::Unknown,
|
||||
_ => return None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for ConnectionState {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match *self {
|
||||
ConnectionState::Init => f.write_str("init"),
|
||||
ConnectionState::Idle => f.write_str("idle"),
|
||||
ConnectionState::Transaction => f.write_str("transaction"),
|
||||
ConnectionState::Busy => f.write_str("busy"),
|
||||
ConnectionState::Closed => f.write_str("closed"),
|
||||
ConnectionState::Unknown => f.write_str("unknown"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Stores the `ConnectionState`. Used by ConnectionTracker to avoid needing
|
||||
/// mutable references.
|
||||
#[derive(Debug, Default)]
|
||||
struct AtomicConnectionState(AtomicU8);
|
||||
|
||||
impl AtomicConnectionState {
|
||||
fn set(&self, state: ConnectionState) {
|
||||
self.0.store(state.into_repr(), Ordering::Relaxed);
|
||||
}
|
||||
|
||||
fn get(&self) -> ConnectionState {
|
||||
ConnectionState::from_repr(self.0.load(Ordering::Relaxed)).expect("only valid variants")
|
||||
}
|
||||
}
|
||||
|
||||
/// Tracks the `ConnectionState` of a connection by inspecting the frontend and
|
||||
/// backend stream and reacting to specific messages. Used in combination with
|
||||
/// two `TrackedStream`s.
|
||||
pub struct ConnectionTracker<SCO: StateChangeObserver> {
|
||||
state: AtomicConnectionState,
|
||||
observer: SCO,
|
||||
conn_id: SCO::ConnId,
|
||||
}
|
||||
|
||||
impl<SCO: StateChangeObserver> Drop for ConnectionTracker<SCO> {
|
||||
fn drop(&mut self) {
|
||||
self.observer
|
||||
.change(self.conn_id, self.state.get(), ConnectionState::Closed);
|
||||
}
|
||||
}
|
||||
|
||||
impl<SCO: StateChangeObserver> ConnectionTracker<SCO> {
|
||||
pub fn new(conn_id: SCO::ConnId, observer: SCO) -> Self {
|
||||
ConnectionTracker {
|
||||
conn_id,
|
||||
state: AtomicConnectionState::default(),
|
||||
observer,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn frontend_message_tag(&self, tag: Tag) {
|
||||
self.update_state(|old_state| Self::state_from_frontend_tag(old_state, tag));
|
||||
}
|
||||
|
||||
pub fn backend_message_tag(&self, tag: Tag) {
|
||||
self.update_state(|old_state| Self::state_from_backend_tag(old_state, tag));
|
||||
}
|
||||
|
||||
fn update_state(&self, new_state_fn: impl FnOnce(ConnectionState) -> ConnectionState) {
|
||||
let old_state = self.state.get();
|
||||
let new_state = new_state_fn(old_state);
|
||||
if old_state != new_state {
|
||||
self.observer.change(self.conn_id, old_state, new_state);
|
||||
self.state.set(new_state);
|
||||
}
|
||||
}
|
||||
|
||||
fn state_from_frontend_tag(_old_state: ConnectionState, fe_tag: Tag) -> ConnectionState {
|
||||
// Most activity from the client puts connection into busy state.
|
||||
// Only the server can put a connection back into idle state.
|
||||
match fe_tag {
|
||||
Tag::Start | Tag::ReadyForQuery(_) | Tag::Message(_) => ConnectionState::Busy,
|
||||
Tag::End => ConnectionState::Closed,
|
||||
Tag::Lost => ConnectionState::Unknown,
|
||||
}
|
||||
}
|
||||
|
||||
fn state_from_backend_tag(old_state: ConnectionState, be_tag: Tag) -> ConnectionState {
|
||||
match be_tag {
|
||||
// Check for RFQ and put connection into idle or idle in transaction state.
|
||||
Tag::ReadyForQuery(b'I') => ConnectionState::Idle,
|
||||
Tag::ReadyForQuery(b'T') => ConnectionState::Transaction,
|
||||
Tag::ReadyForQuery(b'E') => ConnectionState::Transaction,
|
||||
// We can't put a connection into idle state for unknown RFQ status.
|
||||
Tag::ReadyForQuery(_) => ConnectionState::Unknown,
|
||||
// Ignore out-fo message from the server.
|
||||
Tag::NOTICE | Tag::NOTIFICATION_RESPONSE | Tag::PARAMETER_STATUS => old_state,
|
||||
// All other activity from server puts connection into busy state.
|
||||
Tag::Start | Tag::Message(_) => ConnectionState::Busy,
|
||||
|
||||
Tag::End => ConnectionState::Closed,
|
||||
Tag::Lost => ConnectionState::Unknown,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
||||
pub enum Tag {
|
||||
Message(u8),
|
||||
ReadyForQuery(u8),
|
||||
Start,
|
||||
End,
|
||||
Lost,
|
||||
}
|
||||
|
||||
impl Tag {
|
||||
const READY_FOR_QUERY: Tag = Tag::Message(b'Z');
|
||||
const NOTICE: Tag = Tag::Message(b'N');
|
||||
const NOTIFICATION_RESPONSE: Tag = Tag::Message(b'A');
|
||||
const PARAMETER_STATUS: Tag = Tag::Message(b'S');
|
||||
}
|
||||
|
||||
impl fmt::Display for Tag {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match *self {
|
||||
Tag::Start => f.write_str("start"),
|
||||
Tag::End => f.write_str("end"),
|
||||
Tag::Lost => f.write_str("lost"),
|
||||
Tag::Message(tag) => write!(f, "'{}'", tag as char),
|
||||
Tag::ReadyForQuery(status) => write!(f, "ReadyForQuery:'{}'", status as char),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait TagObserver {
|
||||
fn observe(&mut self, tag: Tag);
|
||||
}
|
||||
|
||||
impl<F: FnMut(Tag)> TagObserver for F {
|
||||
fn observe(&mut self, tag: Tag) {
|
||||
(self)(tag);
|
||||
}
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
pub struct TrackedStream<S, TO> {
|
||||
#[pin]
|
||||
stream: S,
|
||||
scanner: StreamScanner<TO>,
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin, TO: TagObserver> TrackedStream<S, TO> {
|
||||
pub const fn new(stream: S, midstream: bool, observer: TO) -> Self {
|
||||
TrackedStream {
|
||||
stream,
|
||||
scanner: StreamScanner::new(midstream, observer),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead + Unpin, TO: TagObserver> AsyncRead for TrackedStream<S, TO> {
|
||||
#[inline]
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
let this = self.project();
|
||||
let old_len = buf.filled().len();
|
||||
match this.stream.poll_read(cx, buf) {
|
||||
Poll::Ready(Ok(())) => {
|
||||
let new_len = buf.filled().len();
|
||||
this.scanner.scan_bytes(&buf.filled()[old_len..new_len]);
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncWrite + Unpin, TO> AsyncWrite for TrackedStream<S, TO> {
|
||||
#[inline(always)]
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
self.project().stream.poll_write(cx, buf)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
self.project().stream.poll_flush(cx)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
self.project().stream.poll_shutdown(cx)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct StreamScanner<TO> {
|
||||
observer: TO,
|
||||
state: StreamScannerState,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
||||
enum StreamScannerState {
|
||||
/// Initial state when no message has been read and we are looling for a
|
||||
/// message without a tag.
|
||||
Start,
|
||||
/// Read a message tag.
|
||||
Tag,
|
||||
/// Read the length bytes and calculate the total length.
|
||||
Length {
|
||||
tag: Tag,
|
||||
/// Number of bytes missing to know the full length of the message: 0..=4
|
||||
length_bytes_missing: usize,
|
||||
/// Total length of the message (without tag) that is calculated as we
|
||||
/// read the bytes for the length.
|
||||
calculated_length: usize,
|
||||
},
|
||||
/// Read (= skip) the payload.
|
||||
Payload {
|
||||
tag: Tag,
|
||||
/// If this is the first time payload bytes are read. Important when
|
||||
/// inspecting specific messages, like ReadyForQuery.
|
||||
first: bool,
|
||||
/// Number of payload bytes left to read before looking for a new tag.
|
||||
bytes_to_skip: usize,
|
||||
},
|
||||
/// Stream was terminated.
|
||||
End,
|
||||
/// Stream ended up in a lost state. We only stop tracking the stream, not
|
||||
/// interrupt it.
|
||||
Lost,
|
||||
}
|
||||
|
||||
impl<TO: TagObserver> StreamScanner<TO> {
|
||||
const fn new(midstream: bool, observer: TO) -> Self {
|
||||
StreamScanner {
|
||||
observer,
|
||||
state: if midstream {
|
||||
StreamScannerState::Tag
|
||||
} else {
|
||||
StreamScannerState::Start
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TO: TagObserver> StreamScanner<TO> {
|
||||
fn scan_bytes(&mut self, mut buf: &[u8]) {
|
||||
use StreamScannerState as S;
|
||||
|
||||
if matches!(self.state, S::End | S::Lost) {
|
||||
return;
|
||||
}
|
||||
if buf.is_empty() {
|
||||
match self.state {
|
||||
S::Start | S::Tag => {
|
||||
self.observer.observe(Tag::End);
|
||||
self.state = S::End;
|
||||
return;
|
||||
}
|
||||
S::Length { .. } | S::Payload { .. } => {
|
||||
self.observer.observe(Tag::Lost);
|
||||
self.state = S::Lost;
|
||||
return;
|
||||
}
|
||||
S::End | S::Lost => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
while !buf.is_empty() {
|
||||
match self.state {
|
||||
S::Start => {
|
||||
self.state = S::Length {
|
||||
tag: Tag::Start,
|
||||
length_bytes_missing: 4,
|
||||
calculated_length: 0,
|
||||
};
|
||||
}
|
||||
|
||||
S::Tag => {
|
||||
let tag = buf.first().copied().expect("buf not empty");
|
||||
buf = &buf[1..];
|
||||
|
||||
self.state = S::Length {
|
||||
tag: Tag::Message(tag),
|
||||
length_bytes_missing: 4,
|
||||
calculated_length: 0,
|
||||
};
|
||||
}
|
||||
|
||||
S::Length {
|
||||
tag,
|
||||
mut length_bytes_missing,
|
||||
mut calculated_length,
|
||||
} => {
|
||||
let consume = length_bytes_missing.min(buf.len());
|
||||
|
||||
let (length_bytes, remainder) = buf.split_at(consume);
|
||||
for b in length_bytes {
|
||||
calculated_length <<= 8;
|
||||
calculated_length |= *b as usize;
|
||||
}
|
||||
buf = remainder;
|
||||
|
||||
length_bytes_missing -= consume;
|
||||
if length_bytes_missing == 0 {
|
||||
let Some(bytes_to_skip) = calculated_length.checked_sub(4) else {
|
||||
self.observer.observe(Tag::Lost);
|
||||
self.state = S::Lost;
|
||||
return;
|
||||
};
|
||||
|
||||
if bytes_to_skip == 0 {
|
||||
self.observer.observe(tag);
|
||||
self.state = S::Tag;
|
||||
} else {
|
||||
self.state = S::Payload {
|
||||
tag,
|
||||
first: true,
|
||||
bytes_to_skip,
|
||||
};
|
||||
}
|
||||
} else {
|
||||
self.state = S::Length {
|
||||
tag,
|
||||
length_bytes_missing,
|
||||
calculated_length,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
S::Payload {
|
||||
tag,
|
||||
first,
|
||||
mut bytes_to_skip,
|
||||
} => {
|
||||
let consume = bytes_to_skip.min(buf.len());
|
||||
bytes_to_skip -= consume;
|
||||
if bytes_to_skip == 0 {
|
||||
if tag == Tag::READY_FOR_QUERY && first && consume == 1 {
|
||||
let status = buf.first().copied().expect("buf not empty");
|
||||
self.observer.observe(Tag::ReadyForQuery(status));
|
||||
} else {
|
||||
self.observer.observe(tag);
|
||||
}
|
||||
self.state = S::Tag;
|
||||
} else {
|
||||
self.state = S::Payload {
|
||||
tag,
|
||||
first: false,
|
||||
bytes_to_skip,
|
||||
};
|
||||
}
|
||||
buf = &buf[consume..];
|
||||
}
|
||||
|
||||
S::End | S::Lost => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::cell::RefCell;
|
||||
use std::pin::pin;
|
||||
use std::rc::Rc;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use tokio::io::{AsyncReadExt, BufReader};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_stream_scanner() {
|
||||
let tags = Rc::new(RefCell::new(Vec::new()));
|
||||
let observer_tags = tags.clone();
|
||||
let observer = move |tag| {
|
||||
observer_tags.borrow_mut().push(tag);
|
||||
};
|
||||
let mut scanner = StreamScanner::new(false, observer);
|
||||
|
||||
scanner.scan_bytes(&[0, 0]);
|
||||
assert_eq!(tags.borrow().as_slice(), &[]);
|
||||
assert_eq!(
|
||||
scanner.state,
|
||||
StreamScannerState::Length {
|
||||
tag: Tag::Start,
|
||||
length_bytes_missing: 2,
|
||||
calculated_length: 0,
|
||||
}
|
||||
);
|
||||
|
||||
scanner.scan_bytes(&[0x01, 0x01, 0x00]);
|
||||
assert_eq!(tags.borrow().as_slice(), &[]);
|
||||
assert_eq!(
|
||||
scanner.state,
|
||||
StreamScannerState::Payload {
|
||||
tag: Tag::Start,
|
||||
first: false,
|
||||
bytes_to_skip: 0x00000101 - 4 - 1,
|
||||
}
|
||||
);
|
||||
|
||||
scanner.scan_bytes(vec![0; 0x00000101 - 4 - 1 - 1].as_slice());
|
||||
assert_eq!(tags.borrow().as_slice(), &[]);
|
||||
assert_eq!(
|
||||
scanner.state,
|
||||
StreamScannerState::Payload {
|
||||
tag: Tag::Start,
|
||||
first: false,
|
||||
bytes_to_skip: 1,
|
||||
}
|
||||
);
|
||||
|
||||
scanner.scan_bytes(&[0x00, b'A', 0x00, 0x00, 0x00, 0x08]);
|
||||
assert_eq!(tags.borrow().as_slice(), &[Tag::Start]);
|
||||
assert_eq!(
|
||||
scanner.state,
|
||||
StreamScannerState::Payload {
|
||||
tag: Tag::Message(b'A'),
|
||||
first: true,
|
||||
bytes_to_skip: 4,
|
||||
}
|
||||
);
|
||||
|
||||
scanner.scan_bytes(&[0, 0, 0, 0]);
|
||||
assert_eq!(tags.borrow().as_slice(), &[Tag::Start, Tag::Message(b'A')]);
|
||||
assert_eq!(scanner.state, StreamScannerState::Tag);
|
||||
|
||||
scanner.scan_bytes(&[b'Z', 0x00, 0x00, 0x00, 0x05, b'T']);
|
||||
assert_eq!(
|
||||
tags.borrow().as_slice(),
|
||||
&[Tag::Start, Tag::Message(b'A'), Tag::ReadyForQuery(b'T')]
|
||||
);
|
||||
assert_eq!(scanner.state, StreamScannerState::Tag);
|
||||
|
||||
scanner.scan_bytes(&[]);
|
||||
assert_eq!(
|
||||
tags.borrow().as_slice(),
|
||||
&[
|
||||
Tag::Start,
|
||||
Tag::Message(b'A'),
|
||||
Tag::ReadyForQuery(b'T'),
|
||||
Tag::End
|
||||
]
|
||||
);
|
||||
assert_eq!(scanner.state, StreamScannerState::End);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_connection_tracker() {
|
||||
let transitions: Arc<Mutex<Vec<(ConnectionState, ConnectionState)>>> = Arc::default();
|
||||
struct Observer(Arc<Mutex<Vec<(ConnectionState, ConnectionState)>>>);
|
||||
impl StateChangeObserver for Observer {
|
||||
type ConnId = usize;
|
||||
fn change(
|
||||
&self,
|
||||
conn_id: Self::ConnId,
|
||||
old_state: ConnectionState,
|
||||
new_state: ConnectionState,
|
||||
) {
|
||||
assert_eq!(conn_id, 42);
|
||||
self.0.lock().unwrap().push((old_state, new_state));
|
||||
}
|
||||
}
|
||||
let tracker = ConnectionTracker::new(42, Observer(transitions.clone()));
|
||||
|
||||
let stream = TestStream::new(
|
||||
&[
|
||||
0, 0, 0, 4, // Init
|
||||
b'Z', 0, 0, 0, 5, b'I', // Init -> Idle
|
||||
b'x', 0, 0, 0, 4, // Idle -> Busy
|
||||
b'Z', 0, 0, 0, 5, b'I', // Busy -> Idle
|
||||
][..],
|
||||
);
|
||||
// AsyncRead
|
||||
let mut stream = TrackedStream::new(stream, false, |tag| tracker.backend_message_tag(tag));
|
||||
|
||||
let mut readbuf = [0; 2];
|
||||
let n = stream.read_exact(&mut readbuf).await.unwrap();
|
||||
assert_eq!(n, 2);
|
||||
assert_eq!(&readbuf, &[0, 0,]);
|
||||
assert!(transitions.lock().unwrap().is_empty());
|
||||
|
||||
let mut readbuf = [0; 2];
|
||||
let n = stream.read_exact(&mut readbuf).await.unwrap();
|
||||
assert_eq!(n, 2);
|
||||
assert_eq!(&readbuf, &[0, 4]);
|
||||
assert_eq!(
|
||||
transitions.lock().unwrap().as_slice(),
|
||||
&[(ConnectionState::Init, ConnectionState::Busy)]
|
||||
);
|
||||
|
||||
let mut readbuf = [0; 6];
|
||||
let n = stream.read_exact(&mut readbuf).await.unwrap();
|
||||
assert_eq!(n, 6);
|
||||
assert_eq!(&readbuf, &[b'Z', 0, 0, 0, 5, b'I']);
|
||||
assert_eq!(
|
||||
transitions.lock().unwrap().as_slice(),
|
||||
&[
|
||||
(ConnectionState::Init, ConnectionState::Busy),
|
||||
(ConnectionState::Busy, ConnectionState::Idle),
|
||||
]
|
||||
);
|
||||
|
||||
let mut readbuf = [0; 5];
|
||||
let n = stream.read_exact(&mut readbuf).await.unwrap();
|
||||
assert_eq!(n, 5);
|
||||
assert_eq!(&readbuf, &[b'x', 0, 0, 0, 4]);
|
||||
assert_eq!(
|
||||
transitions.lock().unwrap().as_slice(),
|
||||
&[
|
||||
(ConnectionState::Init, ConnectionState::Busy),
|
||||
(ConnectionState::Busy, ConnectionState::Idle),
|
||||
(ConnectionState::Idle, ConnectionState::Busy),
|
||||
]
|
||||
);
|
||||
|
||||
let mut readbuf = [0; 6];
|
||||
let n = stream.read_exact(&mut readbuf).await.unwrap();
|
||||
assert_eq!(n, 6);
|
||||
assert_eq!(&readbuf, &[b'Z', 0, 0, 0, 5, b'I']);
|
||||
assert_eq!(
|
||||
transitions.lock().unwrap().as_slice(),
|
||||
&[
|
||||
(ConnectionState::Init, ConnectionState::Busy),
|
||||
(ConnectionState::Busy, ConnectionState::Idle),
|
||||
(ConnectionState::Idle, ConnectionState::Busy),
|
||||
(ConnectionState::Busy, ConnectionState::Idle),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
struct TestStream {
|
||||
stream: BufReader<&'static [u8]>,
|
||||
}
|
||||
impl TestStream {
|
||||
fn new(data: &'static [u8]) -> Self {
|
||||
TestStream {
|
||||
stream: BufReader::new(data),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl AsyncRead for TestStream {
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
pin!(&mut self.stream).poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
impl AsyncWrite for TestStream {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
_cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<Result<usize, io::Error>> {
|
||||
Poll::Ready(Ok(buf.len()))
|
||||
}
|
||||
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
fn poll_shutdown(
|
||||
self: Pin<&mut Self>,
|
||||
_cx: &mut Context<'_>,
|
||||
) -> Poll<Result<(), io::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2,7 +2,6 @@
|
||||
mod tests;
|
||||
|
||||
pub(crate) mod connect_compute;
|
||||
pub mod conntrack;
|
||||
mod copy_bidirectional;
|
||||
pub(crate) mod handshake;
|
||||
pub(crate) mod passthrough;
|
||||
@@ -31,7 +30,6 @@ use crate::context::RequestContext;
|
||||
use crate::error::ReportableError;
|
||||
use crate::metrics::{Metrics, NumClientConnectionsGuard};
|
||||
use crate::protocol2::{ConnectHeader, ConnectionInfo, ConnectionInfoExtra, read_proxy_protocol};
|
||||
use crate::proxy::conntrack::ConnectionTracking;
|
||||
use crate::proxy::handshake::{HandshakeData, handshake};
|
||||
use crate::rate_limiter::EndpointRateLimiter;
|
||||
use crate::stream::{PqStream, Stream};
|
||||
@@ -62,7 +60,6 @@ pub async fn task_main(
|
||||
cancellation_token: CancellationToken,
|
||||
cancellation_handler: Arc<CancellationHandler>,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
conntracking: Arc<ConnectionTracking>,
|
||||
) -> anyhow::Result<()> {
|
||||
scopeguard::defer! {
|
||||
info!("proxy has shut down");
|
||||
@@ -88,7 +85,6 @@ pub async fn task_main(
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
let cancellation_handler = Arc::clone(&cancellation_handler);
|
||||
let cancellations = cancellations.clone();
|
||||
let conntracking = Arc::clone(&conntracking);
|
||||
|
||||
debug!(protocol = "tcp", %session_id, "accepted new TCP connection");
|
||||
let endpoint_rate_limiter2 = endpoint_rate_limiter.clone();
|
||||
@@ -153,7 +149,6 @@ pub async fn task_main(
|
||||
endpoint_rate_limiter2,
|
||||
conn_gauge,
|
||||
cancellations,
|
||||
conntracking,
|
||||
)
|
||||
.instrument(ctx.span())
|
||||
.boxed()
|
||||
@@ -273,7 +268,6 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
conn_gauge: NumClientConnectionsGuard<'static>,
|
||||
cancellations: tokio_util::task::task_tracker::TaskTracker,
|
||||
conntracking: Arc<ConnectionTracking>,
|
||||
) -> Result<Option<ProxyPassthrough<S>>, ClientRequestError> {
|
||||
debug!(
|
||||
protocol = %ctx.protocol(),
|
||||
@@ -415,7 +409,6 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
compute: node,
|
||||
session_id: ctx.session_id(),
|
||||
cancel: session,
|
||||
conntracking,
|
||||
_req: request_gauge,
|
||||
_conn: conn_gauge,
|
||||
}))
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use smol_str::SmolStr;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::debug;
|
||||
@@ -11,7 +9,6 @@ use crate::compute::PostgresConnection;
|
||||
use crate::config::ComputeConfig;
|
||||
use crate::control_plane::messages::MetricsAuxInfo;
|
||||
use crate::metrics::{Direction, Metrics, NumClientConnectionsGuard, NumConnectionRequestsGuard};
|
||||
use crate::proxy::conntrack::{ConnectionTracking, TrackedStream};
|
||||
use crate::stream::Stream;
|
||||
use crate::usage_metrics::{Ids, MetricCounterRecorder, USAGE_METRICS};
|
||||
|
||||
@@ -22,7 +19,6 @@ pub(crate) async fn proxy_pass(
|
||||
compute: impl AsyncRead + AsyncWrite + Unpin,
|
||||
aux: MetricsAuxInfo,
|
||||
private_link_id: Option<SmolStr>,
|
||||
conntracking: &Arc<ConnectionTracking>,
|
||||
) -> Result<(), ErrorSource> {
|
||||
// we will report ingress at a later date
|
||||
let usage_tx = USAGE_METRICS.register(Ids {
|
||||
@@ -31,11 +27,9 @@ pub(crate) async fn proxy_pass(
|
||||
private_link_id,
|
||||
});
|
||||
|
||||
let conn_tracker = conntracking.new_tracker();
|
||||
|
||||
let metrics = &Metrics::get().proxy.io_bytes;
|
||||
let m_sent = metrics.with_labels(Direction::Tx);
|
||||
let client = MeasuredStream::new(
|
||||
let mut client = MeasuredStream::new(
|
||||
client,
|
||||
|_| {},
|
||||
|cnt| {
|
||||
@@ -44,10 +38,9 @@ pub(crate) async fn proxy_pass(
|
||||
usage_tx.record_egress(cnt as u64);
|
||||
},
|
||||
);
|
||||
let mut client = TrackedStream::new(client, true, |tag| conn_tracker.frontend_message_tag(tag));
|
||||
|
||||
let m_recv = metrics.with_labels(Direction::Rx);
|
||||
let compute = MeasuredStream::new(
|
||||
let mut compute = MeasuredStream::new(
|
||||
compute,
|
||||
|_| {},
|
||||
|cnt| {
|
||||
@@ -56,8 +49,6 @@ pub(crate) async fn proxy_pass(
|
||||
usage_tx.record_ingress(cnt as u64);
|
||||
},
|
||||
);
|
||||
let mut compute =
|
||||
TrackedStream::new(compute, true, |tag| conn_tracker.backend_message_tag(tag));
|
||||
|
||||
// Starting from here we only proxy the client's traffic.
|
||||
debug!("performing the proxy pass...");
|
||||
@@ -77,7 +68,6 @@ pub(crate) struct ProxyPassthrough<S> {
|
||||
pub(crate) session_id: uuid::Uuid,
|
||||
pub(crate) private_link_id: Option<SmolStr>,
|
||||
pub(crate) cancel: cancellation::Session,
|
||||
pub(crate) conntracking: Arc<ConnectionTracking>,
|
||||
|
||||
pub(crate) _req: NumConnectionRequestsGuard<'static>,
|
||||
pub(crate) _conn: NumClientConnectionsGuard<'static>,
|
||||
@@ -93,7 +83,6 @@ impl<S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<S> {
|
||||
self.compute.stream,
|
||||
self.aux,
|
||||
self.private_link_id,
|
||||
&self.conntracking,
|
||||
)
|
||||
.await;
|
||||
if let Err(err) = self
|
||||
|
||||
@@ -50,7 +50,6 @@ use crate::context::RequestContext;
|
||||
use crate::ext::TaskExt;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::protocol2::{ChainRW, ConnectHeader, ConnectionInfo, read_proxy_protocol};
|
||||
use crate::proxy::conntrack::ConnectionTracking;
|
||||
use crate::proxy::run_until_cancelled;
|
||||
use crate::rate_limiter::EndpointRateLimiter;
|
||||
use crate::serverless::backend::PoolingBackend;
|
||||
@@ -125,9 +124,6 @@ pub async fn task_main(
|
||||
connections.close(); // allows `connections.wait to complete`
|
||||
|
||||
let cancellations = tokio_util::task::task_tracker::TaskTracker::new();
|
||||
|
||||
let conntracking = Arc::new(ConnectionTracking::default());
|
||||
|
||||
while let Some(res) = run_until_cancelled(ws_listener.accept(), &cancellation_token).await {
|
||||
let (conn, peer_addr) = res.context("could not accept TCP stream")?;
|
||||
if let Err(e) = conn.set_nodelay(true) {
|
||||
@@ -157,8 +153,6 @@ pub async fn task_main(
|
||||
let cancellation_handler = cancellation_handler.clone();
|
||||
let endpoint_rate_limiter = endpoint_rate_limiter.clone();
|
||||
let cancellations = cancellations.clone();
|
||||
let conntracking = Arc::clone(&conntracking);
|
||||
|
||||
connections.spawn(
|
||||
async move {
|
||||
let conn_token2 = conn_token.clone();
|
||||
@@ -191,7 +185,6 @@ pub async fn task_main(
|
||||
cancellation_handler,
|
||||
endpoint_rate_limiter,
|
||||
conn_token,
|
||||
conntracking,
|
||||
conn,
|
||||
conn_info,
|
||||
session_id,
|
||||
@@ -316,7 +309,6 @@ async fn connection_handler(
|
||||
cancellation_handler: Arc<CancellationHandler>,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
cancellation_token: CancellationToken,
|
||||
conntracking: Arc<ConnectionTracking>,
|
||||
conn: AsyncRW,
|
||||
conn_info: ConnectionInfo,
|
||||
session_id: uuid::Uuid,
|
||||
@@ -355,7 +347,6 @@ async fn connection_handler(
|
||||
// `request_handler` is not cancel safe. It expects to be cancelled only at specific times.
|
||||
// By spawning the future, we ensure it never gets cancelled until it decides to.
|
||||
let cancellations = cancellations.clone();
|
||||
let conntracking = Arc::clone(&conntracking);
|
||||
let handler = connections.spawn(
|
||||
request_handler(
|
||||
req,
|
||||
@@ -368,7 +359,6 @@ async fn connection_handler(
|
||||
http_request_token,
|
||||
endpoint_rate_limiter.clone(),
|
||||
cancellations,
|
||||
conntracking,
|
||||
)
|
||||
.in_current_span()
|
||||
.map_ok_or_else(api_error_into_response, |r| r),
|
||||
@@ -417,7 +407,6 @@ async fn request_handler(
|
||||
http_cancellation_token: CancellationToken,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
cancellations: TaskTracker,
|
||||
conntracking: Arc<ConnectionTracking>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ApiError> {
|
||||
let host = request
|
||||
.headers()
|
||||
@@ -463,7 +452,6 @@ async fn request_handler(
|
||||
endpoint_rate_limiter,
|
||||
host,
|
||||
cancellations,
|
||||
conntracking,
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -17,7 +17,6 @@ use crate::config::ProxyConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::error::ReportableError;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::proxy::conntrack::ConnectionTracking;
|
||||
use crate::proxy::{ClientMode, ErrorSource, handle_client};
|
||||
use crate::rate_limiter::EndpointRateLimiter;
|
||||
|
||||
@@ -134,7 +133,6 @@ pub(crate) async fn serve_websocket(
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
hostname: Option<String>,
|
||||
cancellations: tokio_util::task::task_tracker::TaskTracker,
|
||||
conntracking: Arc<ConnectionTracking>,
|
||||
) -> anyhow::Result<()> {
|
||||
let websocket = websocket.await?;
|
||||
let websocket = WebSocketServer::after_handshake(TokioIo::new(websocket));
|
||||
@@ -154,7 +152,6 @@ pub(crate) async fn serve_websocket(
|
||||
endpoint_rate_limiter,
|
||||
conn_gauge,
|
||||
cancellations,
|
||||
conntracking,
|
||||
))
|
||||
.await;
|
||||
|
||||
|
||||
@@ -55,7 +55,6 @@ tokio-util = { workspace = true }
|
||||
tracing.workspace = true
|
||||
url.workspace = true
|
||||
metrics.workspace = true
|
||||
pem.workspace = true
|
||||
postgres_backend.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
pq_proto.workspace = true
|
||||
|
||||
@@ -16,6 +16,7 @@ use futures::stream::FuturesUnordered;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use metrics::set_build_info_metric;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use reqwest::Certificate;
|
||||
use safekeeper::defaults::{
|
||||
DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_EVICTION_MIN_RESIDENT, DEFAULT_HEARTBEAT_TIMEOUT,
|
||||
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_CONCURRENCY,
|
||||
@@ -372,10 +373,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
Some(ssl_ca_file) => {
|
||||
tracing::info!("Using ssl root CA file: {ssl_ca_file:?}");
|
||||
let buf = tokio::fs::read(ssl_ca_file).await?;
|
||||
pem::parse_many(&buf)?
|
||||
.into_iter()
|
||||
.filter(|pem| pem.tag() == "CERTIFICATE")
|
||||
.collect()
|
||||
Certificate::from_pem_bundle(&buf)?
|
||||
}
|
||||
None => Vec::new(),
|
||||
};
|
||||
|
||||
@@ -24,15 +24,6 @@ use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
const RETRY_INTERVAL_MSEC: u64 = 1000;
|
||||
const PUSH_INTERVAL_MSEC: u64 = 1000;
|
||||
|
||||
fn make_tls_config(conf: &SafeKeeperConf) -> storage_broker::ClientTlsConfig {
|
||||
storage_broker::ClientTlsConfig::new().ca_certificates(
|
||||
conf.ssl_ca_certs
|
||||
.iter()
|
||||
.map(pem::encode)
|
||||
.map(storage_broker::Certificate::from_pem),
|
||||
)
|
||||
}
|
||||
|
||||
/// Push once in a while data about all active timelines to the broker.
|
||||
async fn push_loop(
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
@@ -46,11 +37,8 @@ async fn push_loop(
|
||||
|
||||
let active_timelines_set = global_timelines.get_global_broker_active_set();
|
||||
|
||||
let mut client = storage_broker::connect(
|
||||
conf.broker_endpoint.clone(),
|
||||
conf.broker_keepalive_interval,
|
||||
make_tls_config(&conf),
|
||||
)?;
|
||||
let mut client =
|
||||
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
|
||||
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
|
||||
|
||||
let outbound = async_stream::stream! {
|
||||
@@ -93,11 +81,8 @@ async fn pull_loop(
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
stats: Arc<BrokerStats>,
|
||||
) -> Result<()> {
|
||||
let mut client = storage_broker::connect(
|
||||
conf.broker_endpoint.clone(),
|
||||
conf.broker_keepalive_interval,
|
||||
make_tls_config(&conf),
|
||||
)?;
|
||||
let mut client =
|
||||
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
|
||||
|
||||
// TODO: subscribe only to local timelines instead of all
|
||||
let request = SubscribeSafekeeperInfoRequest {
|
||||
@@ -149,11 +134,8 @@ async fn discover_loop(
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
stats: Arc<BrokerStats>,
|
||||
) -> Result<()> {
|
||||
let mut client = storage_broker::connect(
|
||||
conf.broker_endpoint.clone(),
|
||||
conf.broker_keepalive_interval,
|
||||
make_tls_config(&conf),
|
||||
)?;
|
||||
let mut client =
|
||||
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
|
||||
|
||||
let request = SubscribeByFilterRequest {
|
||||
types: vec![TypeSubscription {
|
||||
|
||||
@@ -14,7 +14,6 @@ use http_utils::json::{json_request, json_response};
|
||||
use http_utils::request::{ensure_no_body, parse_query_param, parse_request_param};
|
||||
use http_utils::{RequestExt, RouterBuilder};
|
||||
use hyper::{Body, Request, Response, StatusCode};
|
||||
use pem::Pem;
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use safekeeper_api::models::{
|
||||
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult,
|
||||
@@ -231,20 +230,14 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
let conf = get_conf(&request);
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
|
||||
let ca_certs = conf
|
||||
.ssl_ca_certs
|
||||
.iter()
|
||||
.map(Pem::contents)
|
||||
.map(reqwest::Certificate::from_der)
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map_err(|e| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!("failed to parse CA certs: {e}"))
|
||||
})?;
|
||||
|
||||
let resp =
|
||||
pull_timeline::handle_request(data, conf.sk_auth_token.clone(), ca_certs, global_timelines)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
let resp = pull_timeline::handle_request(
|
||||
data,
|
||||
conf.sk_auth_token.clone(),
|
||||
conf.ssl_ca_certs.clone(),
|
||||
global_timelines,
|
||||
)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::OK, resp)
|
||||
}
|
||||
|
||||
|
||||
@@ -6,8 +6,8 @@ use std::time::Duration;
|
||||
|
||||
use camino::Utf8PathBuf;
|
||||
use once_cell::sync::Lazy;
|
||||
use pem::Pem;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use reqwest::Certificate;
|
||||
use storage_broker::Uri;
|
||||
use tokio::runtime::Runtime;
|
||||
use utils::auth::SwappableJwtAuth;
|
||||
@@ -120,7 +120,7 @@ pub struct SafeKeeperConf {
|
||||
pub ssl_key_file: Utf8PathBuf,
|
||||
pub ssl_cert_file: Utf8PathBuf,
|
||||
pub ssl_cert_reload_period: Duration,
|
||||
pub ssl_ca_certs: Vec<Pem>,
|
||||
pub ssl_ca_certs: Vec<Certificate>,
|
||||
pub use_https_safekeeper_api: bool,
|
||||
}
|
||||
|
||||
|
||||
@@ -8,7 +8,6 @@ use std::time::SystemTime;
|
||||
use anyhow::{Context, bail};
|
||||
use futures::StreamExt;
|
||||
use postgres_protocol::message::backend::ReplicationMessage;
|
||||
use reqwest::Certificate;
|
||||
use safekeeper_api::Term;
|
||||
use safekeeper_api::membership::INVALID_GENERATION;
|
||||
use safekeeper_api::models::{PeerInfo, TimelineStatus};
|
||||
@@ -242,7 +241,7 @@ async fn recover(
|
||||
|
||||
let mut client = reqwest::Client::builder();
|
||||
for cert in &conf.ssl_ca_certs {
|
||||
client = client.add_root_certificate(Certificate::from_der(cert.contents())?);
|
||||
client = client.add_root_certificate(cert.clone());
|
||||
}
|
||||
let client = client
|
||||
.build()
|
||||
|
||||
@@ -87,12 +87,7 @@ fn tli_from_u64(i: u64) -> Vec<u8> {
|
||||
async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>, i: u64) {
|
||||
let mut client = match client {
|
||||
Some(c) => c,
|
||||
None => storage_broker::connect(
|
||||
DEFAULT_ENDPOINT,
|
||||
Duration::from_secs(5),
|
||||
storage_broker::ClientTlsConfig::new(),
|
||||
)
|
||||
.unwrap(),
|
||||
None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(),
|
||||
};
|
||||
|
||||
let ttid = ProtoTenantTimelineId {
|
||||
@@ -124,12 +119,7 @@ async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>,
|
||||
async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
|
||||
let mut client = match client {
|
||||
Some(c) => c,
|
||||
None => storage_broker::connect(
|
||||
DEFAULT_ENDPOINT,
|
||||
Duration::from_secs(5),
|
||||
storage_broker::ClientTlsConfig::new(),
|
||||
)
|
||||
.unwrap(),
|
||||
None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(),
|
||||
};
|
||||
let mut counter: u64 = 0;
|
||||
|
||||
@@ -174,12 +164,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
}
|
||||
let h = tokio::spawn(progress_reporter(counters.clone()));
|
||||
|
||||
let c = storage_broker::connect(
|
||||
DEFAULT_ENDPOINT,
|
||||
Duration::from_secs(5),
|
||||
storage_broker::ClientTlsConfig::new(),
|
||||
)
|
||||
.unwrap();
|
||||
let c = storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap();
|
||||
|
||||
for i in 0..args.num_subs {
|
||||
let c = Some(c.clone());
|
||||
|
||||
@@ -4,7 +4,7 @@ use proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
use proto::broker_service_client::BrokerServiceClient;
|
||||
use tonic::Status;
|
||||
use tonic::codegen::StdError;
|
||||
use tonic::transport::{Channel, Endpoint};
|
||||
use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
|
||||
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
||||
|
||||
// Code generated by protobuf.
|
||||
@@ -20,7 +20,6 @@ pub mod metrics;
|
||||
|
||||
// Re-exports to avoid direct tonic dependency in user crates.
|
||||
pub use hyper::Uri;
|
||||
pub use tonic::transport::{Certificate, ClientTlsConfig};
|
||||
pub use tonic::{Code, Request, Streaming};
|
||||
|
||||
pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
|
||||
@@ -39,11 +38,7 @@ pub type BrokerClientChannel = BrokerServiceClient<Channel>;
|
||||
//
|
||||
// NB: this function is not async, but still must be run on a tokio runtime thread
|
||||
// because that's a requirement of tonic_endpoint.connect_lazy()'s Channel::new call.
|
||||
pub fn connect<U>(
|
||||
endpoint: U,
|
||||
keepalive_interval: Duration,
|
||||
tls_config: ClientTlsConfig,
|
||||
) -> anyhow::Result<BrokerClientChannel>
|
||||
pub fn connect<U>(endpoint: U, keepalive_interval: Duration) -> anyhow::Result<BrokerClientChannel>
|
||||
where
|
||||
U: std::convert::TryInto<Uri>,
|
||||
U::Error: std::error::Error + Send + Sync + 'static,
|
||||
@@ -59,7 +54,8 @@ where
|
||||
rustls::crypto::ring::default_provider()
|
||||
.install_default()
|
||||
.ok();
|
||||
tonic_endpoint = tonic_endpoint.tls_config(tls_config)?;
|
||||
let tls = ClientTlsConfig::new();
|
||||
tonic_endpoint = tonic_endpoint.tls_config(tls)?;
|
||||
}
|
||||
tonic_endpoint = tonic_endpoint
|
||||
.http2_keep_alive_interval(keepalive_interval)
|
||||
|
||||
@@ -126,7 +126,6 @@ pub(crate) enum DatabaseOperation {
|
||||
InsertTimelineReconcile,
|
||||
RemoveTimelineReconcile,
|
||||
ListTimelineReconcile,
|
||||
ListTimelineReconcileStartup,
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
@@ -1522,41 +1521,23 @@ impl Persistence {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Load pending operations from db, joined together with timeline data.
|
||||
pub(crate) async fn list_pending_ops_with_timelines(
|
||||
/// Load pending operations from db.
|
||||
pub(crate) async fn list_pending_ops(
|
||||
&self,
|
||||
) -> DatabaseResult<Vec<(TimelinePendingOpPersistence, Option<TimelinePersistence>)>> {
|
||||
) -> DatabaseResult<Vec<TimelinePendingOpPersistence>> {
|
||||
use crate::schema::safekeeper_timeline_pending_ops::dsl;
|
||||
use crate::schema::timelines;
|
||||
|
||||
let timeline_from_db = self
|
||||
.with_measured_conn(
|
||||
DatabaseOperation::ListTimelineReconcileStartup,
|
||||
move |conn| {
|
||||
Box::pin(async move {
|
||||
let from_db: Vec<(TimelinePendingOpPersistence, Option<TimelineFromDb>)> =
|
||||
dsl::safekeeper_timeline_pending_ops
|
||||
.left_join(
|
||||
timelines::table.on(timelines::tenant_id
|
||||
.eq(dsl::tenant_id)
|
||||
.and(timelines::timeline_id.eq(dsl::timeline_id))),
|
||||
)
|
||||
.select((
|
||||
TimelinePendingOpPersistence::as_select(),
|
||||
Option::<TimelineFromDb>::as_select(),
|
||||
))
|
||||
.load(conn)
|
||||
.await?;
|
||||
Ok(from_db)
|
||||
})
|
||||
},
|
||||
)
|
||||
.with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| {
|
||||
Box::pin(async move {
|
||||
let from_db: Vec<TimelinePendingOpPersistence> =
|
||||
dsl::safekeeper_timeline_pending_ops.load(conn).await?;
|
||||
Ok(from_db)
|
||||
})
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(timeline_from_db
|
||||
.into_iter()
|
||||
.map(|(op, tl_opt)| (op, tl_opt.map(|tl_opt| tl_opt.into_persistence())))
|
||||
.collect())
|
||||
Ok(timeline_from_db)
|
||||
}
|
||||
/// List pending operations for a given timeline (including tenant-global ones)
|
||||
pub(crate) async fn list_pending_ops_for_timeline(
|
||||
@@ -1599,7 +1580,7 @@ impl Persistence {
|
||||
|
||||
let tenant_id = &tenant_id;
|
||||
let timeline_id = &timeline_id;
|
||||
self.with_measured_conn(DatabaseOperation::RemoveTimelineReconcile, move |conn| {
|
||||
self.with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| {
|
||||
let timeline_id_str = timeline_id.map(|tid| tid.to_string()).unwrap_or_default();
|
||||
Box::pin(async move {
|
||||
diesel::delete(dsl::safekeeper_timeline_pending_ops)
|
||||
|
||||
@@ -824,13 +824,9 @@ impl Service {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
locked.become_leader();
|
||||
|
||||
for (sk_id, _sk) in locked.safekeepers.clone().iter() {
|
||||
locked.safekeeper_reconcilers.start_reconciler(*sk_id, self);
|
||||
}
|
||||
|
||||
locked
|
||||
.safekeeper_reconcilers
|
||||
.schedule_request_vec(sk_schedule_requests);
|
||||
.schedule_request_vec(self, sk_schedule_requests);
|
||||
}
|
||||
|
||||
// TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that
|
||||
|
||||
@@ -30,35 +30,31 @@ impl SafekeeperReconcilers {
|
||||
reconcilers: HashMap::new(),
|
||||
}
|
||||
}
|
||||
/// Adds a safekeeper-specific reconciler.
|
||||
/// Can be called multiple times, but it needs to be called at least once
|
||||
/// for every new safekeeper added.
|
||||
pub(crate) fn start_reconciler(&mut self, node_id: NodeId, service: &Arc<Service>) {
|
||||
self.reconcilers.entry(node_id).or_insert_with(|| {
|
||||
SafekeeperReconciler::spawn(self.cancel.child_token(), service.clone())
|
||||
});
|
||||
}
|
||||
/// Stop a safekeeper-specific reconciler.
|
||||
/// Stops the reconciler, cancelling all ongoing tasks.
|
||||
pub(crate) fn stop_reconciler(&mut self, node_id: NodeId) {
|
||||
if let Some(handle) = self.reconcilers.remove(&node_id) {
|
||||
handle.cancel.cancel();
|
||||
}
|
||||
}
|
||||
pub(crate) fn schedule_request_vec(&self, reqs: Vec<ScheduleRequest>) {
|
||||
pub(crate) fn schedule_request_vec(
|
||||
&mut self,
|
||||
service: &Arc<Service>,
|
||||
reqs: Vec<ScheduleRequest>,
|
||||
) {
|
||||
tracing::info!(
|
||||
"Scheduling {} pending safekeeper ops loaded from db",
|
||||
reqs.len()
|
||||
);
|
||||
for req in reqs {
|
||||
self.schedule_request(req);
|
||||
self.schedule_request(service, req);
|
||||
}
|
||||
}
|
||||
pub(crate) fn schedule_request(&self, req: ScheduleRequest) {
|
||||
pub(crate) fn schedule_request(&mut self, service: &Arc<Service>, req: ScheduleRequest) {
|
||||
let node_id = req.safekeeper.get_id();
|
||||
let reconciler_handle = self.reconcilers.get(&node_id).unwrap();
|
||||
let reconciler_handle = self.reconcilers.entry(node_id).or_insert_with(|| {
|
||||
SafekeeperReconciler::spawn(self.cancel.child_token(), service.clone())
|
||||
});
|
||||
reconciler_handle.schedule_reconcile(req);
|
||||
}
|
||||
pub(crate) fn cancel_safekeeper(&mut self, node_id: NodeId) {
|
||||
if let Some(handle) = self.reconcilers.remove(&node_id) {
|
||||
handle.cancel.cancel();
|
||||
}
|
||||
}
|
||||
/// Cancel ongoing reconciles for the given timeline
|
||||
///
|
||||
/// Specifying `None` here only removes reconciles for the tenant-global reconciliation,
|
||||
@@ -82,12 +78,9 @@ pub(crate) async fn load_schedule_requests(
|
||||
service: &Arc<Service>,
|
||||
safekeepers: &HashMap<NodeId, Safekeeper>,
|
||||
) -> anyhow::Result<Vec<ScheduleRequest>> {
|
||||
let pending_ops_timelines = service
|
||||
.persistence
|
||||
.list_pending_ops_with_timelines()
|
||||
.await?;
|
||||
let mut res = Vec::with_capacity(pending_ops_timelines.len());
|
||||
for (op_persist, timeline_persist) in pending_ops_timelines {
|
||||
let pending_ops = service.persistence.list_pending_ops().await?;
|
||||
let mut res = Vec::with_capacity(pending_ops.len());
|
||||
for op_persist in pending_ops {
|
||||
let node_id = NodeId(op_persist.sk_id as u64);
|
||||
let Some(sk) = safekeepers.get(&node_id) else {
|
||||
// This shouldn't happen, at least the safekeeper should exist as decomissioned.
|
||||
@@ -109,12 +102,16 @@ pub(crate) async fn load_schedule_requests(
|
||||
SafekeeperTimelineOpKind::Delete => Vec::new(),
|
||||
SafekeeperTimelineOpKind::Exclude => Vec::new(),
|
||||
SafekeeperTimelineOpKind::Pull => {
|
||||
if timeline_id.is_none() {
|
||||
// We only do this extra check (outside of timeline_persist check) to give better error msgs
|
||||
// TODO this code is super hacky, it doesn't take migrations into account
|
||||
let Some(timeline_id) = timeline_id else {
|
||||
anyhow::bail!(
|
||||
"timeline_id is empty for `pull` schedule request for {tenant_id}"
|
||||
);
|
||||
};
|
||||
let timeline_persist = service
|
||||
.persistence
|
||||
.get_timeline(tenant_id, timeline_id)
|
||||
.await?;
|
||||
let Some(timeline_persist) = timeline_persist else {
|
||||
// This shouldn't happen, the timeline should still exist
|
||||
tracing::warn!(
|
||||
@@ -166,7 +163,6 @@ pub(crate) struct ScheduleRequest {
|
||||
pub(crate) kind: SafekeeperTimelineOpKind,
|
||||
}
|
||||
|
||||
/// Handle to per safekeeper reconciler.
|
||||
struct ReconcilerHandle {
|
||||
tx: UnboundedSender<(ScheduleRequest, CancellationToken)>,
|
||||
ongoing_tokens: Arc<ClashMap<(TenantId, Option<TimelineId>), CancellationToken>>,
|
||||
@@ -174,10 +170,7 @@ struct ReconcilerHandle {
|
||||
}
|
||||
|
||||
impl ReconcilerHandle {
|
||||
/// Obtain a new token slot, cancelling any existing reconciliations for
|
||||
/// that timeline. It is not useful to have >1 operation per <tenant_id,
|
||||
/// timeline_id, safekeeper>, hence scheduling op cancels current one if it
|
||||
/// exists.
|
||||
/// Obtain a new token slot, cancelling any existing reconciliations for that timeline
|
||||
fn new_token_slot(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
@@ -312,16 +305,15 @@ impl SafekeeperReconciler {
|
||||
SafekeeperTimelineOpKind::Delete => {
|
||||
let tenant_id = req.tenant_id;
|
||||
if let Some(timeline_id) = req.timeline_id {
|
||||
let deleted = self
|
||||
.reconcile_inner(
|
||||
req,
|
||||
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|
||||
|_resp| {
|
||||
tracing::info!("deleted timeline from {req_host}");
|
||||
},
|
||||
req_cancel,
|
||||
)
|
||||
.await;
|
||||
let deleted = self.reconcile_inner(
|
||||
req,
|
||||
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|
||||
|_resp| {
|
||||
tracing::info!(%tenant_id, %timeline_id, "deleted timeline from {req_host}");
|
||||
},
|
||||
req_cancel,
|
||||
)
|
||||
.await;
|
||||
if deleted {
|
||||
self.delete_timeline_from_db(tenant_id, timeline_id).await;
|
||||
}
|
||||
@@ -352,13 +344,12 @@ impl SafekeeperReconciler {
|
||||
{
|
||||
Ok(list) => {
|
||||
if !list.is_empty() {
|
||||
// duplicate the timeline_id here because it might be None in the reconcile context
|
||||
tracing::info!(%timeline_id, "not deleting timeline from db as there is {} open reconciles", list.len());
|
||||
tracing::info!(%tenant_id, %timeline_id, "not deleting timeline from db as there is {} open reconciles", list.len());
|
||||
return;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(%timeline_id, "couldn't query pending ops: {e}");
|
||||
tracing::warn!(%tenant_id, %timeline_id, "couldn't query pending ops: {e}");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,7 +46,6 @@ impl Service {
|
||||
.map(SecretString::from);
|
||||
let mut joinset = JoinSet::new();
|
||||
|
||||
// Prepare membership::Configuration from choosen safekeepers.
|
||||
let safekeepers = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
locked.safekeepers.clone()
|
||||
@@ -206,7 +205,7 @@ impl Service {
|
||||
tenant_id: tenant_id.to_string(),
|
||||
timeline_id: timeline_id.to_string(),
|
||||
start_lsn: start_lsn.into(),
|
||||
generation: 1,
|
||||
generation: 0,
|
||||
sk_set: sks_persistence.clone(),
|
||||
new_sk_set: None,
|
||||
cplane_notified_generation: 0,
|
||||
@@ -255,7 +254,7 @@ impl Service {
|
||||
self.persistence.insert_pending_op(pending_op).await?;
|
||||
}
|
||||
if !remaining.is_empty() {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
for remaining_id in remaining {
|
||||
let Some(sk) = locked.safekeepers.get(&remaining_id) else {
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
@@ -291,7 +290,7 @@ impl Service {
|
||||
generation: timeline_persist.generation as u32,
|
||||
kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
|
||||
};
|
||||
locked.safekeeper_reconcilers.schedule_request(req);
|
||||
locked.safekeeper_reconcilers.schedule_request(self, req);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -358,7 +357,7 @@ impl Service {
|
||||
let pending_op = TimelinePendingOpPersistence {
|
||||
tenant_id: tenant_id.to_string(),
|
||||
timeline_id: timeline_id.to_string(),
|
||||
generation: i32::MAX,
|
||||
generation: tl.generation,
|
||||
op_kind: SafekeeperTimelineOpKind::Delete,
|
||||
sk_id: *sk_id,
|
||||
};
|
||||
@@ -366,7 +365,7 @@ impl Service {
|
||||
self.persistence.insert_pending_op(pending_op).await?;
|
||||
}
|
||||
{
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
for sk_id in all_sks {
|
||||
let sk_id = NodeId(*sk_id as u64);
|
||||
let Some(sk) = locked.safekeepers.get(&sk_id) else {
|
||||
@@ -384,7 +383,7 @@ impl Service {
|
||||
generation: tl.generation as u32,
|
||||
kind: SafekeeperTimelineOpKind::Delete,
|
||||
};
|
||||
locked.safekeeper_reconcilers.schedule_request(req);
|
||||
locked.safekeeper_reconcilers.schedule_request(self, req);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -483,7 +482,7 @@ impl Service {
|
||||
tenant_id,
|
||||
timeline_id: None,
|
||||
};
|
||||
locked.safekeeper_reconcilers.schedule_request(req);
|
||||
locked.safekeeper_reconcilers.schedule_request(self, req);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -580,7 +579,7 @@ impl Service {
|
||||
}
|
||||
|
||||
pub(crate) async fn upsert_safekeeper(
|
||||
self: &Arc<Service>,
|
||||
&self,
|
||||
record: crate::persistence::SafekeeperUpsert,
|
||||
) -> Result<(), ApiError> {
|
||||
let node_id = NodeId(record.id as u64);
|
||||
@@ -619,9 +618,6 @@ impl Service {
|
||||
);
|
||||
}
|
||||
}
|
||||
locked
|
||||
.safekeeper_reconcilers
|
||||
.start_reconciler(node_id, self);
|
||||
locked.safekeepers = Arc::new(safekeepers);
|
||||
metrics::METRICS_REGISTRY
|
||||
.metrics_group
|
||||
@@ -642,7 +638,7 @@ impl Service {
|
||||
}
|
||||
|
||||
pub(crate) async fn set_safekeeper_scheduling_policy(
|
||||
self: &Arc<Service>,
|
||||
&self,
|
||||
id: i64,
|
||||
scheduling_policy: SkSchedulingPolicy,
|
||||
) -> Result<(), DatabaseError> {
|
||||
@@ -660,13 +656,9 @@ impl Service {
|
||||
sk.set_scheduling_policy(scheduling_policy);
|
||||
|
||||
match scheduling_policy {
|
||||
SkSchedulingPolicy::Active => {
|
||||
locked
|
||||
.safekeeper_reconcilers
|
||||
.start_reconciler(node_id, self);
|
||||
}
|
||||
SkSchedulingPolicy::Active => (),
|
||||
SkSchedulingPolicy::Decomissioned | SkSchedulingPolicy::Pause => {
|
||||
locked.safekeeper_reconcilers.stop_reconciler(node_id);
|
||||
locked.safekeeper_reconcilers.cancel_safekeeper(node_id);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -22,62 +22,19 @@ def connection_parameters_to_env(params: dict[str, str]) -> dict[str, str]:
|
||||
}
|
||||
|
||||
|
||||
# Some API calls not yet implemented.
|
||||
# You may want to copy not-yet-implemented methods from the PR https://github.com/neondatabase/neon/pull/11305
|
||||
class NeonAPI:
|
||||
def __init__(self, neon_api_key: str, neon_api_base_url: str):
|
||||
self.__neon_api_key = neon_api_key
|
||||
self.__neon_api_base_url = neon_api_base_url.strip("/")
|
||||
self.retry_if_possible = False
|
||||
self.attempts = 10
|
||||
self.sleep_before_retry = 1
|
||||
self.retries524 = 0
|
||||
self.retries4xx = 0
|
||||
|
||||
def __request(self, method: str | bytes, endpoint: str, **kwargs: Any) -> requests.Response:
|
||||
kwargs["headers"] = kwargs.get("headers", {})
|
||||
if "headers" not in kwargs:
|
||||
kwargs["headers"] = {}
|
||||
kwargs["headers"]["Authorization"] = f"Bearer {self.__neon_api_key}"
|
||||
|
||||
for attempt in range(self.attempts):
|
||||
retry = False
|
||||
resp = requests.request(method, f"{self.__neon_api_base_url}{endpoint}", **kwargs)
|
||||
if resp.status_code >= 400:
|
||||
log.error(
|
||||
"%s %s returned a %d: %s",
|
||||
method,
|
||||
endpoint,
|
||||
resp.status_code,
|
||||
resp.text if resp.status_code != 524 else "CloudFlare error page",
|
||||
)
|
||||
else:
|
||||
log.debug("%s %s returned a %d: %s", method, endpoint, resp.status_code, resp.text)
|
||||
if not self.retry_if_possible:
|
||||
resp.raise_for_status()
|
||||
break
|
||||
elif resp.status_code >= 400:
|
||||
if resp.status_code == 422:
|
||||
if resp.json()["message"] == "branch not ready yet":
|
||||
retry = True
|
||||
self.retries4xx += 1
|
||||
elif resp.status_code == 423 and resp.json()["message"] in {
|
||||
"endpoint is in some transitive state, could not suspend",
|
||||
"project already has running conflicting operations, scheduling of new ones is prohibited",
|
||||
}:
|
||||
retry = True
|
||||
self.retries4xx += 1
|
||||
elif resp.status_code == 524:
|
||||
log.info("The request was timed out, trying to get operations")
|
||||
retry = True
|
||||
self.retries524 += 1
|
||||
if retry:
|
||||
log.info("Retrying, attempt %s/%s", attempt + 1, self.attempts)
|
||||
time.sleep(self.sleep_before_retry)
|
||||
continue
|
||||
else:
|
||||
resp.raise_for_status()
|
||||
break
|
||||
else:
|
||||
raise RuntimeError("Max retry count is reached")
|
||||
resp = requests.request(method, f"{self.__neon_api_base_url}{endpoint}", **kwargs)
|
||||
log.debug("%s %s returned a %d: %s", method, endpoint, resp.status_code, resp.text)
|
||||
resp.raise_for_status()
|
||||
|
||||
return resp
|
||||
|
||||
@@ -144,96 +101,6 @@ class NeonAPI:
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def create_branch(
|
||||
self,
|
||||
project_id: str,
|
||||
branch_name: str | None = None,
|
||||
parent_id: str | None = None,
|
||||
parent_lsn: str | None = None,
|
||||
parent_timestamp: str | None = None,
|
||||
protected: bool | None = None,
|
||||
archived: bool | None = None,
|
||||
init_source: str | None = None,
|
||||
add_endpoint=True,
|
||||
) -> dict[str, Any]:
|
||||
data: dict[str, Any] = {}
|
||||
if add_endpoint:
|
||||
data["endpoints"] = [{"type": "read_write"}]
|
||||
data["branch"] = {}
|
||||
if parent_id:
|
||||
data["branch"]["parent_id"] = parent_id
|
||||
if branch_name:
|
||||
data["branch"]["name"] = branch_name
|
||||
if parent_lsn is not None:
|
||||
data["branch"]["parent_lsn"] = parent_lsn
|
||||
if parent_timestamp is not None:
|
||||
data["branch"]["parent_timestamp"] = parent_timestamp
|
||||
if protected is not None:
|
||||
data["branch"]["protected"] = protected
|
||||
if init_source is not None:
|
||||
data["branch"]["init_source"] = init_source
|
||||
if archived is not None:
|
||||
data["branch"]["archived"] = archived
|
||||
if not data["branch"]:
|
||||
data.pop("branch")
|
||||
resp = self.__request(
|
||||
"POST",
|
||||
f"/projects/{project_id}/branches",
|
||||
headers={
|
||||
"Accept": "application/json",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
json=data,
|
||||
)
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def get_branch_details(self, project_id: str, branch_id: str) -> dict[str, Any]:
|
||||
resp = self.__request(
|
||||
"GET",
|
||||
f"/projects/{project_id}/branches/{branch_id}",
|
||||
headers={
|
||||
"Accept": "application/json",
|
||||
},
|
||||
)
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def delete_branch(self, project_id: str, branch_id: str) -> dict[str, Any]:
|
||||
resp = self.__request(
|
||||
"DELETE",
|
||||
f"/projects/{project_id}/branches/{branch_id}",
|
||||
headers={
|
||||
"Accept": "application/json",
|
||||
},
|
||||
)
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def restore_branch(
|
||||
self,
|
||||
project_id: str,
|
||||
branch_id: str,
|
||||
source_branch_id: str,
|
||||
source_lsn: str | None,
|
||||
source_timestamp: str | None,
|
||||
preserve_under_name: str | None,
|
||||
):
|
||||
data = {"source_branch_id": source_branch_id}
|
||||
if source_lsn:
|
||||
data["source_lsn"] = source_lsn
|
||||
if source_timestamp:
|
||||
data["source_timestamp"] = source_timestamp
|
||||
if preserve_under_name:
|
||||
data["preserve_under_name"] = preserve_under_name
|
||||
log.info("Data: %s", data)
|
||||
resp = self.__request(
|
||||
"POST",
|
||||
f"/projects/{project_id}/branches/{branch_id}/restore",
|
||||
headers={
|
||||
"Accept": "application/json",
|
||||
},
|
||||
json=data,
|
||||
)
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def start_endpoint(
|
||||
self,
|
||||
project_id: str,
|
||||
@@ -309,10 +176,6 @@ class NeonAPI:
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def delete_endpoint(self, project_id: str, endpoint_id: str) -> dict[str, Any]:
|
||||
resp = self.__request("DELETE", f"/projects/{project_id}/endpoints/{endpoint_id}")
|
||||
return cast("dict[str,Any]", resp.json())
|
||||
|
||||
def get_connection_uri(
|
||||
self,
|
||||
project_id: str,
|
||||
|
||||
@@ -3185,7 +3185,6 @@ class PgBin:
|
||||
command: list[str],
|
||||
env: Env | None = None,
|
||||
cwd: str | Path | None = None,
|
||||
stderr_pipe: Any | None = None,
|
||||
) -> subprocess.Popen[Any]:
|
||||
"""
|
||||
Run one of the postgres binaries, not waiting for it to finish
|
||||
@@ -3203,9 +3202,7 @@ class PgBin:
|
||||
log.info(f"Running command '{' '.join(command)}'")
|
||||
env = self._build_env(env)
|
||||
self._log_env(env)
|
||||
return subprocess.Popen(
|
||||
command, env=env, cwd=cwd, stdout=subprocess.PIPE, stderr=stderr_pipe, text=True
|
||||
)
|
||||
return subprocess.Popen(command, env=env, cwd=cwd, stdout=subprocess.PIPE, text=True)
|
||||
|
||||
def run(
|
||||
self,
|
||||
|
||||
@@ -1,93 +0,0 @@
|
||||
# Random Operations Test for Neon Stability
|
||||
|
||||
## Problem Statement
|
||||
|
||||
Neon needs robust testing of Neon's stability to ensure reliability for users. The random operations test addresses this by continuously exercising the API with unpredictable sequences of operations, helping to identify edge cases and potential issues that might not be caught by deterministic tests.
|
||||
|
||||
### Key Components
|
||||
|
||||
#### 1. Class Structure
|
||||
|
||||
The test implements three main classes to model the Neon architecture:
|
||||
|
||||
- **NeonProject**: Represents a Neon project and manages the lifecycle of branches and endpoints
|
||||
- **NeonBranch**: Represents a branch within a project, with methods for creating child branches, endpoints, and performing point-in-time restores
|
||||
- **NeonEndpoint**: Represents an endpoint (connection point) for a branch, with methods for managing benchmarks
|
||||
|
||||
#### 2. Operations Tested
|
||||
|
||||
The test randomly performs the following operations with weighted probabilities:
|
||||
|
||||
- **Creating branches**
|
||||
- **Deleting branches**
|
||||
- **Adding read-only endpoints**
|
||||
- **Deleting read-only endpoints**
|
||||
- **Restoring branches to random points in time**
|
||||
|
||||
#### 3. Load Generation
|
||||
|
||||
Each branch and endpoint is loaded with `pgbench` to simulate real database workloads during testing. This ensures that the operations are performed against branches with actual data and ongoing transactions.
|
||||
|
||||
#### 4. Error Handling
|
||||
|
||||
The test includes robust error handling for various scenarios:
|
||||
- Branch limit exceeded
|
||||
- Connection timeouts
|
||||
- Control plane timeouts (HTTP 524 errors)
|
||||
- Benchmark failures
|
||||
|
||||
#### 5. CI Integration
|
||||
|
||||
The test is integrated into the CI pipeline via a GitHub workflow that runs daily, ensuring continuous validation of API stability.
|
||||
|
||||
## How It Works
|
||||
|
||||
1. The test creates a Neon project using the Public API
|
||||
2. It initializes the main branch with pgbench data
|
||||
3. It performs random operations according to the weighted probabilities
|
||||
4. During each operation, it checks that all running benchmarks are still operational
|
||||
5. The test cleans up by deleting the project at the end
|
||||
|
||||
## Configuration
|
||||
|
||||
The test can be configured with:
|
||||
- `RANDOM_SEED`: Set a specific random seed for reproducible test runs
|
||||
- `NEON_API_KEY`: API key for authentication
|
||||
- `NEON_API_BASE_URL`: Base URL for the API (defaults to staging environment)
|
||||
- `NUM_OPERATIONS`: The number of operations to be performed
|
||||
|
||||
## Running the Test
|
||||
|
||||
The test is designed to run in the CI environment but can also be executed locally:
|
||||
|
||||
```bash
|
||||
NEON_API_KEY=your_api_key ./scripts/pytest test_runner/random_ops/test_random_ops.py -m remote_cluster
|
||||
```
|
||||
|
||||
To run with a specific random seed for reproducibility:
|
||||
|
||||
```bash
|
||||
RANDOM_SEED=12345 NEON_API_KEY=your_api_key ./scripts/pytest test_runner/random_ops/test_random_ops.py -m remote_cluster
|
||||
```
|
||||
|
||||
To run with the custom number of operations:
|
||||
|
||||
```bash
|
||||
NUM_OPERATIONS=500 NEON_API_KEY=your_api_key ./scripts/pytest test_runner/random_ops/test_random_ops.py -m remote_cluster
|
||||
```
|
||||
|
||||
## Benefits
|
||||
|
||||
This test provides several key benefits:
|
||||
1. **Comprehensive API testing**: Exercises multiple API endpoints in combination
|
||||
2. **Edge case discovery**: Random sequences may uncover issues not found in deterministic tests
|
||||
3. **Stability validation**: Continuous execution helps ensure long-term API reliability
|
||||
4. **Regression prevention**: Detects if new changes break existing API functionality
|
||||
|
||||
## Future Improvements
|
||||
|
||||
Potential enhancements to the test could include:
|
||||
1. Adding more API operations, e.g. `reset_to_parent`, `snapshot`, etc
|
||||
2. Implementing more sophisticated load patterns
|
||||
3. Adding metrics collection to measure API performance
|
||||
4. Extending test duration for longer-term stability validation
|
||||
@@ -1,463 +0,0 @@
|
||||
"""
|
||||
Run the random API tests on the cloud instance of Neon
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import random
|
||||
import subprocess
|
||||
import time
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from requests import HTTPError
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pathlib import Path
|
||||
|
||||
from fixtures.neon_api import NeonAPI
|
||||
from fixtures.neon_fixtures import PgBin
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
|
||||
class NeonEndpoint:
|
||||
"""
|
||||
Neon Endpoint
|
||||
Gets the output of the API call of an endpoint creation
|
||||
"""
|
||||
|
||||
def __init__(self, project: NeonProject, endpoint: dict[str, Any]):
|
||||
self.project: NeonProject = project
|
||||
self.id: str = endpoint["id"]
|
||||
# The branch endpoint belongs to
|
||||
self.branch: NeonBranch = project.branches[endpoint["branch_id"]]
|
||||
self.type: str = endpoint["type"]
|
||||
# add itself to the list of endpoints of the branch
|
||||
self.branch.endpoints[self.id] = self
|
||||
self.project.endpoints[self.id] = self
|
||||
self.host: str = endpoint["host"]
|
||||
self.benchmark: subprocess.Popen[Any] | None = None
|
||||
# The connection environment is used when running benchmark
|
||||
self.connect_env: dict[str, str] | None = None
|
||||
if self.branch.connect_env:
|
||||
self.connect_env = self.branch.connect_env.copy()
|
||||
self.connect_env["PGHOST"] = self.host
|
||||
|
||||
def delete(self):
|
||||
self.project.delete_endpoint(self.id)
|
||||
|
||||
def start_benchmark(self, clients=10):
|
||||
return self.project.start_benchmark(self.id, clients=clients)
|
||||
|
||||
def check_benchmark(self):
|
||||
self.project.check_benchmark(self.id)
|
||||
|
||||
def terminate_benchmark(self):
|
||||
self.project.terminate_benchmark(self.id)
|
||||
|
||||
|
||||
class NeonBranch:
|
||||
"""
|
||||
Neon Branch
|
||||
Gets the output of the API call of the Neon Public API call of a branch creation as a first parameter
|
||||
is_reset defines if the branch is a reset one i.e. created as a result of the reset API Call
|
||||
"""
|
||||
|
||||
def __init__(self, project, branch: dict[str, Any], is_reset=False):
|
||||
self.id: str = branch["branch"]["id"]
|
||||
self.desc = branch
|
||||
self.project: NeonProject = project
|
||||
self.neon_api: NeonAPI = project.neon_api
|
||||
self.project_id: str = branch["branch"]["project_id"]
|
||||
self.parent: NeonBranch | None = (
|
||||
self.project.branches[branch["branch"]["parent_id"]]
|
||||
if "parent_id" in branch["branch"]
|
||||
else None
|
||||
)
|
||||
if is_reset:
|
||||
self.project.reset_branches.add(self.id)
|
||||
elif self.parent:
|
||||
self.project.leaf_branches[self.id] = self
|
||||
if self.parent is not None and self.parent.id in self.project.leaf_branches:
|
||||
self.project.leaf_branches.pop(self.parent.id)
|
||||
self.project.branches[self.id] = self
|
||||
self.children: dict[str, NeonBranch] = {}
|
||||
if self.parent is not None:
|
||||
self.parent.children[self.id] = self
|
||||
self.endpoints: dict[str, NeonEndpoint] = {}
|
||||
self.connection_parameters: dict[str, str] | None = (
|
||||
branch["connection_uris"][0]["connection_parameters"]
|
||||
if "connection_uris" in branch
|
||||
else None
|
||||
)
|
||||
self.benchmark: subprocess.Popen[Any] | None = None
|
||||
self.updated_at: datetime = datetime.fromisoformat(branch["branch"]["updated_at"])
|
||||
self.connect_env: dict[str, str] | None = None
|
||||
if self.connection_parameters:
|
||||
self.connect_env = {
|
||||
"PGHOST": self.connection_parameters["host"],
|
||||
"PGUSER": self.connection_parameters["role"],
|
||||
"PGDATABASE": self.connection_parameters["database"],
|
||||
"PGPASSWORD": self.connection_parameters["password"],
|
||||
"PGSSLMODE": "require",
|
||||
}
|
||||
|
||||
def __str__(self):
|
||||
"""
|
||||
Prints the branch's name with all the predecessors
|
||||
(r) means the branch is a reset one
|
||||
"""
|
||||
return f"{self.id}{'(r)' if self.id in self.project.reset_branches else ''}, parent: {self.parent}"
|
||||
|
||||
def create_child_branch(self) -> NeonBranch | None:
|
||||
return self.project.create_branch(self.id)
|
||||
|
||||
def create_ro_endpoint(self) -> NeonEndpoint:
|
||||
return NeonEndpoint(
|
||||
self.project,
|
||||
self.neon_api.create_endpoint(self.project_id, self.id, "read_only", {})["endpoint"],
|
||||
)
|
||||
|
||||
def delete(self) -> None:
|
||||
self.project.delete_branch(self.id)
|
||||
|
||||
def start_benchmark(self, clients=10) -> subprocess.Popen[Any]:
|
||||
return self.project.start_benchmark(self.id, clients=clients)
|
||||
|
||||
def check_benchmark(self) -> None:
|
||||
self.project.check_benchmark(self.id)
|
||||
|
||||
def terminate_benchmark(self) -> None:
|
||||
self.project.terminate_benchmark(self.id)
|
||||
|
||||
def restore_random_time(self) -> None:
|
||||
"""
|
||||
Does PITR, i.e. calls the reset API call on the same branch to the random time in the past
|
||||
"""
|
||||
min_time = self.updated_at + timedelta(seconds=1)
|
||||
max_time = datetime.now(UTC) - timedelta(seconds=1)
|
||||
target_time = (min_time + (max_time - min_time) * random.random()).replace(microsecond=0)
|
||||
res = self.restore(
|
||||
self.id,
|
||||
source_timestamp=target_time.isoformat().replace("+00:00", "Z"),
|
||||
preserve_under_name=self.project.gen_restore_name(),
|
||||
)
|
||||
if res is None:
|
||||
return
|
||||
self.updated_at = datetime.fromisoformat(res["branch"]["updated_at"])
|
||||
parent_id: str = res["branch"]["parent_id"]
|
||||
# Creates an object for the parent branch
|
||||
# After the reset operation a new parent branch is created
|
||||
parent = NeonBranch(
|
||||
self.project, self.neon_api.get_branch_details(self.project_id, parent_id), True
|
||||
)
|
||||
self.project.branches[parent_id] = parent
|
||||
self.parent = parent
|
||||
parent.children[self.id] = self
|
||||
self.project.wait()
|
||||
|
||||
def restore(
|
||||
self,
|
||||
source_branch_id: str,
|
||||
source_lsn: str | None = None,
|
||||
source_timestamp: str | None = None,
|
||||
preserve_under_name: str | None = None,
|
||||
) -> dict[str, Any] | None:
|
||||
endpoints = [ep for ep in self.endpoints.values() if ep.type == "read_only"]
|
||||
# Terminate all the benchmarks running to prevent errors. Errors in benchmark during pgbench are expected
|
||||
for ep in endpoints:
|
||||
ep.terminate_benchmark()
|
||||
self.terminate_benchmark()
|
||||
try:
|
||||
res: dict[str, Any] = self.neon_api.restore_branch(
|
||||
self.project_id,
|
||||
self.id,
|
||||
source_branch_id,
|
||||
source_lsn,
|
||||
source_timestamp,
|
||||
preserve_under_name,
|
||||
)
|
||||
except HTTPError as he:
|
||||
if (
|
||||
he.response.status_code == 422
|
||||
and he.response.json()["code"] == "BRANCHES_LIMIT_EXCEEDED"
|
||||
):
|
||||
log.info("Branch limit exceeded, skipping")
|
||||
return None
|
||||
else:
|
||||
raise HTTPError(he) from he
|
||||
self.project.wait()
|
||||
self.start_benchmark()
|
||||
for ep in endpoints:
|
||||
ep.start_benchmark()
|
||||
return res
|
||||
|
||||
|
||||
class NeonProject:
|
||||
"""
|
||||
The project object
|
||||
Calls the Public API to create a Neon Project
|
||||
"""
|
||||
|
||||
def __init__(self, neon_api: NeonAPI, pg_bin: PgBin, pg_version: PgVersion):
|
||||
self.neon_api = neon_api
|
||||
self.pg_bin = pg_bin
|
||||
proj = self.neon_api.create_project(
|
||||
pg_version, f"Automatic random API test {os.getenv('GITHUB_RUN_ID')}"
|
||||
)
|
||||
self.id: str = proj["project"]["id"]
|
||||
self.name: str = proj["project"]["name"]
|
||||
self.connection_uri: str = proj["connection_uris"][0]["connection_uri"]
|
||||
self.connection_parameters: dict[str, str] = proj["connection_uris"][0][
|
||||
"connection_parameters"
|
||||
]
|
||||
self.pg_version: PgVersion = pg_version
|
||||
# Leaf branches are the branches, which do not have children
|
||||
self.leaf_branches: dict[str, NeonBranch] = {}
|
||||
self.branches: dict[str, NeonBranch] = {}
|
||||
self.reset_branches: set[str] = set()
|
||||
self.main_branch: NeonBranch = NeonBranch(self, proj)
|
||||
self.main_branch.connection_parameters = self.connection_parameters
|
||||
self.endpoints: dict[str, NeonEndpoint] = {}
|
||||
for endpoint in proj["endpoints"]:
|
||||
NeonEndpoint(self, endpoint)
|
||||
self.neon_api.wait_for_operation_to_finish(self.id)
|
||||
self.benchmarks: dict[str, subprocess.Popen[Any]] = {}
|
||||
self.restore_num: int = 0
|
||||
self.restart_pgbench_on_console_errors: bool = False
|
||||
|
||||
def delete(self):
|
||||
self.neon_api.delete_project(self.id)
|
||||
|
||||
def create_branch(self, parent_id: str | None = None) -> NeonBranch | None:
|
||||
self.wait()
|
||||
try:
|
||||
branch_def = self.neon_api.create_branch(self.id, parent_id=parent_id)
|
||||
except HTTPError as he:
|
||||
if (
|
||||
he.response.status_code == 422
|
||||
and he.response.json()["code"] == "BRANCHES_LIMIT_EXCEEDED"
|
||||
):
|
||||
log.info("Branch limit exceeded, skipping")
|
||||
return None
|
||||
else:
|
||||
raise HTTPError(he) from he
|
||||
new_branch = NeonBranch(self, branch_def)
|
||||
self.wait()
|
||||
return new_branch
|
||||
|
||||
def delete_branch(self, branch_id: str) -> None:
|
||||
parent = self.branches[branch_id].parent
|
||||
if not parent or branch_id == self.main_branch.id:
|
||||
raise RuntimeError("Cannot delete the main branch")
|
||||
if branch_id not in self.leaf_branches and branch_id not in self.reset_branches:
|
||||
raise RuntimeError(f"The branch {branch_id}, probably, has ancestors")
|
||||
if branch_id not in self.branches:
|
||||
raise RuntimeError(f"The branch with id {branch_id} is not found")
|
||||
endpoints_to_delete = [
|
||||
ep for ep in self.branches[branch_id].endpoints.values() if ep.type == "read_only"
|
||||
]
|
||||
for ep in endpoints_to_delete:
|
||||
ep.delete()
|
||||
if branch_id not in self.reset_branches:
|
||||
self.terminate_benchmark(branch_id)
|
||||
self.neon_api.delete_branch(self.id, branch_id)
|
||||
if len(parent.children) == 1 and parent.id != self.main_branch.id:
|
||||
self.leaf_branches[parent.id] = parent
|
||||
parent.children.pop(branch_id)
|
||||
if branch_id in self.leaf_branches:
|
||||
self.leaf_branches.pop(branch_id)
|
||||
else:
|
||||
self.reset_branches.remove(branch_id)
|
||||
self.branches.pop(branch_id)
|
||||
self.wait()
|
||||
if parent.id in self.reset_branches:
|
||||
parent.delete()
|
||||
|
||||
def delete_endpoint(self, endpoint_id: str) -> None:
|
||||
self.terminate_benchmark(endpoint_id)
|
||||
self.neon_api.delete_endpoint(self.id, endpoint_id)
|
||||
self.endpoints[endpoint_id].branch.endpoints.pop(endpoint_id)
|
||||
self.endpoints.pop(endpoint_id)
|
||||
self.wait()
|
||||
|
||||
def start_benchmark(self, target: str, clients: int = 10) -> subprocess.Popen[Any]:
|
||||
if target in self.benchmarks:
|
||||
raise RuntimeError(f"Benchmark was already started for {target}")
|
||||
is_endpoint = target.startswith("ep")
|
||||
read_only = is_endpoint and self.endpoints[target].type == "read_only"
|
||||
cmd = ["pgbench", f"-c{clients}", "-T10800", "-Mprepared"]
|
||||
if read_only:
|
||||
cmd.extend(["-S", "-n"])
|
||||
target_object = self.endpoints[target] if is_endpoint else self.branches[target]
|
||||
if target_object.connect_env is None:
|
||||
raise RuntimeError(f"The connection environment is not defined for {target}")
|
||||
log.info(
|
||||
"running pgbench on %s, cmd: %s, host: %s",
|
||||
target,
|
||||
cmd,
|
||||
target_object.connect_env["PGHOST"],
|
||||
)
|
||||
pgbench = self.pg_bin.run_nonblocking(
|
||||
cmd, env=target_object.connect_env, stderr_pipe=subprocess.PIPE
|
||||
)
|
||||
self.benchmarks[target] = pgbench
|
||||
target_object.benchmark = pgbench
|
||||
time.sleep(2)
|
||||
return pgbench
|
||||
|
||||
def check_all_benchmarks(self) -> None:
|
||||
for target in tuple(self.benchmarks.keys()):
|
||||
self.check_benchmark(target)
|
||||
|
||||
def check_benchmark(self, target) -> None:
|
||||
rc = self.benchmarks[target].poll()
|
||||
if rc is not None:
|
||||
_, err = self.benchmarks[target].communicate()
|
||||
log.error("STDERR: %s", err)
|
||||
# if the benchmark failed due to irresponsible Control plane,
|
||||
# just restart it
|
||||
if self.restart_pgbench_on_console_errors and (
|
||||
"ERROR: Couldn't connect to compute node" in err
|
||||
or "ERROR: Console request failed" in err
|
||||
):
|
||||
log.info("Restarting benchmark for %s", target)
|
||||
self.benchmarks.pop(target)
|
||||
self.start_benchmark(target)
|
||||
return
|
||||
raise RuntimeError(f"The benchmark for {target} ended with code {rc}")
|
||||
|
||||
def terminate_benchmark(self, target):
|
||||
log.info("Terminating the benchmark %s", target)
|
||||
target_endpoint = target.startswith("ep")
|
||||
self.check_benchmark(target)
|
||||
self.benchmarks[target].terminate()
|
||||
self.benchmarks.pop(target)
|
||||
if target_endpoint:
|
||||
self.endpoints[target].benchmark = None
|
||||
else:
|
||||
self.branches[target].benchmark = None
|
||||
|
||||
def wait(self):
|
||||
"""
|
||||
Wait for all the operations to be finished
|
||||
"""
|
||||
return self.neon_api.wait_for_operation_to_finish(self.id)
|
||||
|
||||
def gen_restore_name(self):
|
||||
self.restore_num += 1
|
||||
return f"restore{self.restore_num}"
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def setup_class(
|
||||
pg_version: PgVersion,
|
||||
pg_bin: PgBin,
|
||||
neon_api: NeonAPI,
|
||||
):
|
||||
neon_api.retry_if_possible = True
|
||||
project = NeonProject(neon_api, pg_bin, pg_version)
|
||||
log.info("Created a project with id %s, name %s", project.id, project.name)
|
||||
yield pg_bin, project
|
||||
log.info("Retried 524 errors: %s", neon_api.retries524)
|
||||
log.info("Retried 4xx errors: %s", neon_api.retries4xx)
|
||||
if neon_api.retries524 > 0:
|
||||
print(f"::warning::Retried on 524 error {neon_api.retries524} times")
|
||||
if neon_api.retries4xx > 0:
|
||||
print(f"::warning::Retried on 4xx error {neon_api.retries4xx} times")
|
||||
log.info("Removing the project")
|
||||
project.delete()
|
||||
|
||||
|
||||
def do_action(project: NeonProject, action: str) -> None:
|
||||
"""
|
||||
Runs the action
|
||||
"""
|
||||
log.info("Action: %s", action)
|
||||
if action == "new_branch":
|
||||
log.info("Trying to create a new branch")
|
||||
parent = project.branches[
|
||||
random.choice(list(set(project.branches.keys()) - project.reset_branches))
|
||||
]
|
||||
log.info("Parent: %s", parent)
|
||||
child = parent.create_child_branch()
|
||||
if child is None:
|
||||
return
|
||||
log.info("Created branch %s", child)
|
||||
child.start_benchmark()
|
||||
elif action == "delete_branch":
|
||||
if project.leaf_branches:
|
||||
target = random.choice(list(project.leaf_branches.values()))
|
||||
log.info("Trying to delete branch %s", target)
|
||||
target.delete()
|
||||
else:
|
||||
log.info("Leaf branches not found, skipping")
|
||||
elif action == "new_ro_endpoint":
|
||||
ep = random.choice(
|
||||
[br for br in project.branches.values() if br.id not in project.reset_branches]
|
||||
).create_ro_endpoint()
|
||||
log.info("Created the RO endpoint with id %s branch: %s", ep.id, ep.branch.id)
|
||||
ep.start_benchmark()
|
||||
elif action == "delete_ro_endpoint":
|
||||
ro_endpoints: list[NeonEndpoint] = [
|
||||
endpoint for endpoint in project.endpoints.values() if endpoint.type == "read_only"
|
||||
]
|
||||
if ro_endpoints:
|
||||
target_ep: NeonEndpoint = random.choice(ro_endpoints)
|
||||
target_ep.delete()
|
||||
log.info("endpoint %s deleted", target_ep.id)
|
||||
else:
|
||||
log.info("no read_only endpoints present, skipping")
|
||||
elif action == "restore_random_time":
|
||||
if project.leaf_branches:
|
||||
br: NeonBranch = random.choice(list(project.leaf_branches.values()))
|
||||
log.info("Restore %s", br)
|
||||
br.restore_random_time()
|
||||
else:
|
||||
log.info("No leaf branches found")
|
||||
else:
|
||||
raise ValueError(f"The action {action} is unknown")
|
||||
|
||||
|
||||
@pytest.mark.timeout(7200)
|
||||
@pytest.mark.remote_cluster
|
||||
def test_api_random(
|
||||
setup_class,
|
||||
pg_distrib_dir: Path,
|
||||
test_output_dir: Path,
|
||||
):
|
||||
"""
|
||||
Run the random API tests
|
||||
"""
|
||||
if seed_env := os.getenv("RANDOM_SEED"):
|
||||
seed = int(seed_env)
|
||||
else:
|
||||
seed = 0
|
||||
if seed == 0:
|
||||
seed = int(time.time())
|
||||
log.info("Using random seed: %s", seed)
|
||||
random.seed(seed)
|
||||
pg_bin, project = setup_class
|
||||
# Here we can assign weights
|
||||
ACTIONS = (
|
||||
("new_branch", 1.5),
|
||||
("new_ro_endpoint", 1.4),
|
||||
("delete_ro_endpoint", 0.8),
|
||||
("delete_branch", 1.0),
|
||||
("restore_random_time", 1.2),
|
||||
)
|
||||
if num_ops_env := os.getenv("NUM_OPERATIONS"):
|
||||
num_operations = int(num_ops_env)
|
||||
else:
|
||||
num_operations = 250
|
||||
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", "-s100"], env=project.main_branch.connect_env)
|
||||
for _ in range(num_operations):
|
||||
log.info("Starting action #%s", _ + 1)
|
||||
do_action(
|
||||
project, random.choices([a[0] for a in ACTIONS], weights=[w[1] for w in ACTIONS])[0]
|
||||
)
|
||||
project.check_all_benchmarks()
|
||||
assert True
|
||||
@@ -4114,29 +4114,13 @@ def test_storage_controller_location_conf_equivalence(neon_env_builder: NeonEnvB
|
||||
assert reconciles_after_restart == 0
|
||||
|
||||
|
||||
class RestartStorcon(Enum):
|
||||
RESTART = "restart"
|
||||
ONLINE = "online"
|
||||
|
||||
|
||||
class DeletionSubject(Enum):
|
||||
TIMELINE = "timeline"
|
||||
TENANT = "tenant"
|
||||
|
||||
|
||||
@run_only_on_default_postgres("PG version is not interesting here")
|
||||
@pytest.mark.parametrize("restart_storcon", [RestartStorcon.RESTART, RestartStorcon.ONLINE])
|
||||
@pytest.mark.parametrize("deletetion_subject", [DeletionSubject.TENANT, DeletionSubject.TIMELINE])
|
||||
def test_storcon_create_delete_sk_down(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
restart_storcon: RestartStorcon,
|
||||
deletetion_subject: DeletionSubject,
|
||||
):
|
||||
@pytest.mark.parametrize("restart_storcon", [True, False])
|
||||
def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart_storcon: bool):
|
||||
"""
|
||||
Test that the storcon can create and delete tenants and timelines with a safekeeper being down.
|
||||
- restart_storcon: tests that the pending ops are persisted.
|
||||
- restart_storcon: tests whether the pending ops are persisted.
|
||||
if we don't restart, we test that we don't require it to come from the db.
|
||||
- deletion_subject: test that both single timeline and whole tenant deletion work.
|
||||
"""
|
||||
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
@@ -4159,7 +4143,6 @@ def test_storcon_create_delete_sk_down(
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
env.create_tenant(tenant_id, timeline_id)
|
||||
child_timeline_id = env.create_branch("child_of_main", tenant_id)
|
||||
|
||||
env.safekeepers[1].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
|
||||
env.safekeepers[2].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
|
||||
@@ -4172,7 +4155,7 @@ def test_storcon_create_delete_sk_down(
|
||||
]
|
||||
)
|
||||
|
||||
if restart_storcon == RestartStorcon.RESTART:
|
||||
if restart_storcon:
|
||||
# Restart the storcon to check that we persist operations
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
@@ -4185,13 +4168,6 @@ def test_storcon_create_delete_sk_down(
|
||||
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
|
||||
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
|
||||
|
||||
with env.endpoints.create(
|
||||
"child_of_main", tenant_id=tenant_id, config_lines=config_lines
|
||||
) as ep:
|
||||
# endpoint should start.
|
||||
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
|
||||
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
|
||||
|
||||
env.storage_controller.assert_log_contains("writing pending op for sk id 1")
|
||||
env.safekeepers[0].start()
|
||||
|
||||
@@ -4200,31 +4176,25 @@ def test_storcon_create_delete_sk_down(
|
||||
env.safekeepers[0].assert_log_contains(
|
||||
f"pulling timeline {tenant_id}/{timeline_id} from safekeeper"
|
||||
)
|
||||
env.safekeepers[0].assert_log_contains(
|
||||
f"pulling timeline {tenant_id}/{child_timeline_id} from safekeeper"
|
||||
)
|
||||
|
||||
wait_until(logged_contains_on_sk)
|
||||
|
||||
env.safekeepers[1].stop()
|
||||
|
||||
if deletetion_subject == DeletionSubject.TENANT:
|
||||
env.storage_controller.pageserver_api().tenant_delete(tenant_id)
|
||||
else:
|
||||
env.storage_controller.pageserver_api().timeline_delete(tenant_id, child_timeline_id)
|
||||
env.storage_controller.pageserver_api().tenant_delete(tenant_id)
|
||||
|
||||
# ensure the safekeeper deleted the timeline
|
||||
def timeline_deleted_on_active_sks():
|
||||
env.safekeepers[0].assert_log_contains(
|
||||
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
|
||||
f"deleting timeline {tenant_id}/{timeline_id} from disk"
|
||||
)
|
||||
env.safekeepers[2].assert_log_contains(
|
||||
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
|
||||
f"deleting timeline {tenant_id}/{timeline_id} from disk"
|
||||
)
|
||||
|
||||
wait_until(timeline_deleted_on_active_sks)
|
||||
|
||||
if restart_storcon == RestartStorcon.RESTART:
|
||||
if restart_storcon:
|
||||
# Restart the storcon to check that we persist operations
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
@@ -4234,7 +4204,7 @@ def test_storcon_create_delete_sk_down(
|
||||
# ensure that there is log msgs for the third safekeeper too
|
||||
def timeline_deleted_on_sk():
|
||||
env.safekeepers[1].assert_log_contains(
|
||||
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
|
||||
f"deleting timeline {tenant_id}/{timeline_id} from disk"
|
||||
)
|
||||
|
||||
wait_until(timeline_deleted_on_sk)
|
||||
|
||||
Reference in New Issue
Block a user