Compare commits

..

15 Commits

Author SHA1 Message Date
Konstantin Knizhnik
1c4bed27be Resolve merge conflicts 2025-04-16 08:20:46 +03:00
Konstantin Knizhnik
fdf0f1bdc0 Fix rust formatting 2025-04-16 07:49:47 +03:00
Konstantin Knizhnik
0bdd388dd8 Make it possible to control lazy_sru_download through tenant config 2025-04-16 07:49:47 +03:00
Konstantin Knizhnik
712b4cf83c Update compute_tools/src/compute.rs
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2025-04-16 07:49:46 +03:00
Konstantin Knizhnik
15b6bb5026 Update libs/compute_api/src/spec.rs
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2025-04-16 07:49:44 +03:00
Konstantin Knizhnik
61d642e541 Update pageserver/src/page_service.rs
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2025-04-16 07:48:59 +03:00
Konstantin Knizhnik
1d24b887b8 Refector construction of basebackup command 2025-04-16 07:48:58 +03:00
Konstantin Knizhnik
955175c791 Make clippy happy 2025-04-16 07:48:58 +03:00
Konstantin Knizhnik
5fb0bcdd6a Make clippy happy 2025-04-16 07:48:58 +03:00
Konstantin Knizhnik
f146fa86f8 Use lazy SLRU download for all timelines is feature flag is set 2025-04-16 07:48:56 +03:00
Konstantin Knizhnik
961008116b Use lazy SLRU download for all timelines is feature flag is set 2025-04-16 07:47:55 +03:00
Konstantin Knizhnik
42d2d3addc Fix checking lazy SLRU download condition 2025-04-16 07:45:35 +03:00
Konstantin Knizhnik
06d0bed566 Always update lazy_slru_download flag during basebackup 2025-04-16 07:45:35 +03:00
Konstantin Knizhnik
aa367e5d82 Add lazy_slru_download_threshold parameter to page server config 2025-04-16 07:45:33 +03:00
Konstantin Knizhnik
6b76e1c526 Add lazy_slru_download compute feature flag 2025-04-16 07:42:32 +03:00
164 changed files with 1346 additions and 2828 deletions

View File

@@ -19,7 +19,7 @@
!pageserver/
!pgxn/
!proxy/
!endpoint_storage/
!object_storage/
!storage_scrubber/
!safekeeper/
!storage_broker/

View File

@@ -113,6 +113,8 @@ runs:
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: ${{ inputs.build_type }}
COMPATIBILITY_SNAPSHOT_DIR: /tmp/compatibility_snapshot_pg${{ inputs.pg_version }}
ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'backward compatibility breakage')
ALLOW_FORWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'forward compatibility breakage')
RERUN_FAILED: ${{ inputs.rerun_failed }}
PG_VERSION: ${{ inputs.pg_version }}
SANITIZERS: ${{ inputs.sanitizers }}
@@ -133,7 +135,6 @@ runs:
fi
PERF_REPORT_DIR="$(realpath test_runner/perf-report-local)"
echo "PERF_REPORT_DIR=${PERF_REPORT_DIR}" >> ${GITHUB_ENV}
rm -rf $PERF_REPORT_DIR
TEST_SELECTION="test_runner/${{ inputs.test_selection }}"
@@ -210,12 +211,11 @@ runs:
--verbose \
-rA $TEST_SELECTION $EXTRA_PARAMS
- name: Upload performance report
if: ${{ !cancelled() && inputs.save_perf_report == 'true' }}
shell: bash -euxo pipefail {0}
run: |
export REPORT_FROM="${PERF_REPORT_DIR}"
scripts/generate_and_push_perf_report.sh
if [[ "${{ inputs.save_perf_report }}" == "true" ]]; then
export REPORT_FROM="$PERF_REPORT_DIR"
export REPORT_TO="$PLATFORM"
scripts/generate_and_push_perf_report.sh
fi
- name: Upload compatibility snapshot
# Note, that we use `github.base_ref` which is a target branch for a PR

View File

@@ -272,13 +272,10 @@ jobs:
# run pageserver tests with different settings
for get_vectored_concurrent_io in sequential sidecar-task; do
for io_engine in std-fs tokio-epoll-uring ; do
for io_mode in buffered direct direct-rw ; do
NEON_PAGESERVER_UNIT_TEST_GET_VECTORED_CONCURRENT_IO=$get_vectored_concurrent_io \
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine \
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOMODE=$io_mode \
${cov_prefix} \
cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(pageserver)'
done
NEON_PAGESERVER_UNIT_TEST_GET_VECTORED_CONCURRENT_IO=$get_vectored_concurrent_io \
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine \
${cov_prefix} \
cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(pageserver)'
done
done
@@ -349,7 +346,7 @@ jobs:
contents: read
statuses: write
needs: [ build-neon ]
runs-on: ${{ fromJSON(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'large-arm64' || 'large-metal')) }}
runs-on: ${{ fromJSON(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'large-arm64' || 'large')) }}
container:
image: ${{ inputs.build-tools-image }}
credentials:
@@ -395,7 +392,6 @@ jobs:
BUILD_TAG: ${{ inputs.build-tag }}
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
PAGESERVER_GET_VECTORED_CONCURRENT_IO: sidecar-task
PAGESERVER_VIRTUAL_FILE_IO_MODE: direct
USE_LFC: ${{ matrix.lfc_state == 'with-lfc' && 'true' || 'false' }}
# Temporary disable this step until we figure out why it's so flaky

View File

@@ -165,5 +165,5 @@ jobs:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CURRENT_SHA: ${{ github.sha }}
run: |
RELEASE_PR_RUN_ID=$(gh api "/repos/${GITHUB_REPOSITORY}/actions/runs?head_sha=$CURRENT_SHA" | jq '[.workflow_runs[] | select(.name == "Build and Test") | select(.head_branch | test("^rc/release.*$"; "s"))] | first | .id // ("Failed to find Build and Test run from RC PR!" | halt_error(1))')
RELEASE_PR_RUN_ID=$(gh api "/repos/${GITHUB_REPOSITORY}/actions/runs?head_sha=$CURRENT_SHA" | jq '[.workflow_runs[] | select(.name == "Build and Test") | select(.head_branch | test("^rc/release(-(proxy|compute))?/[0-9]{4}-[0-9]{2}-[0-9]{2}$"; "s"))] | first | .id // ("Failed to find Build and Test run from RC PR!" | halt_error(1))')
echo "release-pr-run-id=$RELEASE_PR_RUN_ID" | tee -a $GITHUB_OUTPUT

View File

@@ -323,8 +323,6 @@ jobs:
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
TEST_RESULT_CONNSTR: "${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}"
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
PAGESERVER_GET_VECTORED_CONCURRENT_IO: sidecar-task
PAGESERVER_VIRTUAL_FILE_IO_MODE: direct
SYNC_BETWEEN_TESTS: true
# XXX: no coverage data handling here, since benchmarks are run on release builds,
# while coverage is currently collected for the debug ones

View File

@@ -27,17 +27,15 @@ jobs:
- name: Fast forwarding
uses: sequoia-pgp/fast-forward@ea7628bedcb0b0b96e94383ada458d812fca4979
# See https://docs.github.com/en/graphql/reference/enums#mergestatestatus
if: ${{ contains(fromJSON('["clean", "unstable"]'), github.event.pull_request.mergeable_state) }}
if: ${{ github.event.pull_request.mergeable_state == 'clean' }}
with:
merge: true
comment: on-error
github_token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Comment if mergeable_state is not clean
if: ${{ !contains(fromJSON('["clean", "unstable"]'), github.event.pull_request.mergeable_state) }}
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
if: ${{ github.event.pull_request.mergeable_state != 'clean' }}
run: |
gh pr comment ${{ github.event.pull_request.number }} \
--repo "${GITHUB_REPOSITORY}" \
--body "Not trying to forward pull-request, because \`mergeable_state\` is \`${{ github.event.pull_request.mergeable_state }}\`, not \`clean\` or \`unstable\`."
--body "Not trying to forward pull-request, because \`mergeable_state\` is \`${{ github.event.pull_request.mergeable_state }}\`, not \`clean\`."

View File

@@ -30,7 +30,7 @@ permissions:
statuses: write # require for posting a status update
env:
DEFAULT_PG_VERSION: 17
DEFAULT_PG_VERSION: 16
PLATFORM: neon-captest-new
AWS_DEFAULT_REGION: eu-central-1
@@ -42,8 +42,6 @@ jobs:
github-event-name: ${{ github.event_name }}
build-build-tools-image:
permissions:
packages: write
needs: [ check-permissions ]
uses: ./.github/workflows/build-build-tools-image.yml
secrets: inherit

View File

@@ -1,93 +0,0 @@
name: Random Operations Test
on:
schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '23 */2 * * *' # runs every 2 hours
workflow_dispatch:
inputs:
random_seed:
type: number
description: 'The random seed'
required: false
default: 0
num_operations:
type: number
description: "The number of operations to test"
default: 250
defaults:
run:
shell: bash -euxo pipefail {0}
permissions: {}
env:
DEFAULT_PG_VERSION: 16
PLATFORM: neon-captest-new
AWS_DEFAULT_REGION: eu-central-1
jobs:
run-random-rests:
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
runs-on: small
permissions:
id-token: write
statuses: write
strategy:
fail-fast: false
matrix:
pg-version: [16, 17]
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Run tests
uses: ./.github/actions/run-python-test-set
with:
build_type: remote
test_selection: random_ops
run_in_parallel: false
extra_params: -m remote_cluster
pg_version: ${{ matrix.pg-version }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
RANDOM_SEED: ${{ inputs.random_seed }}
NUM_OPERATIONS: ${{ inputs.num_operations }}
- name: Create Allure report
if: ${{ !cancelled() }}
id: create-allure-report
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}

60
Cargo.lock generated
View File

@@ -1432,6 +1432,7 @@ dependencies = [
"pageserver_api",
"pageserver_client",
"pem",
"pkcs8 0.10.2",
"postgres_backend",
"postgres_connection",
"regex",
@@ -1441,7 +1442,6 @@ dependencies = [
"serde",
"serde_json",
"sha2",
"spki 0.7.3",
"storage_broker",
"thiserror 1.0.69",
"tokio",
@@ -2037,33 +2037,6 @@ dependencies = [
"zeroize",
]
[[package]]
name = "endpoint_storage"
version = "0.0.1"
dependencies = [
"anyhow",
"axum",
"axum-extra",
"camino",
"camino-tempfile",
"futures",
"http-body-util",
"itertools 0.10.5",
"jsonwebtoken",
"prometheus",
"rand 0.8.5",
"remote_storage",
"serde",
"serde_json",
"test-log",
"tokio",
"tokio-util",
"tower 0.5.2",
"tracing",
"utils",
"workspace_hack",
]
[[package]]
name = "enum-map"
version = "2.5.0"
@@ -4025,6 +3998,33 @@ dependencies = [
"memchr",
]
[[package]]
name = "object_storage"
version = "0.0.1"
dependencies = [
"anyhow",
"axum",
"axum-extra",
"camino",
"camino-tempfile",
"futures",
"http-body-util",
"itertools 0.10.5",
"jsonwebtoken",
"prometheus",
"rand 0.8.5",
"remote_storage",
"serde",
"serde_json",
"test-log",
"tokio",
"tokio-util",
"tower 0.5.2",
"tracing",
"utils",
"workspace_hack",
]
[[package]]
name = "once_cell"
version = "1.20.2"
@@ -4285,7 +4285,6 @@ dependencies = [
"pageserver_api",
"pageserver_client",
"pageserver_compaction",
"pem",
"pin-project-lite",
"postgres-protocol",
"postgres-types",
@@ -4353,7 +4352,6 @@ dependencies = [
"humantime-serde",
"itertools 0.10.5",
"nix 0.27.1",
"once_cell",
"postgres_backend",
"postgres_ffi",
"rand 0.8.5",
@@ -6002,7 +6000,6 @@ dependencies = [
"once_cell",
"pageserver_api",
"parking_lot 0.12.1",
"pem",
"postgres-protocol",
"postgres_backend",
"postgres_ffi",
@@ -8472,6 +8469,7 @@ dependencies = [
"once_cell",
"p256 0.13.2",
"parquet",
"pkcs8 0.10.2",
"prettyplease",
"proc-macro2",
"prost 0.13.3",

View File

@@ -40,7 +40,7 @@ members = [
"libs/proxy/postgres-protocol2",
"libs/proxy/postgres-types2",
"libs/proxy/tokio-postgres2",
"endpoint_storage",
"object_storage",
]
[workspace.package]
@@ -143,6 +143,7 @@ parquet_derive = "53"
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
pem = "3.0.3"
pin-project-lite = "0.2"
pkcs8 = "0.10.2"
pprof = { version = "0.14", features = ["criterion", "flamegraph", "frame-pointer", "prost-codec"] }
procfs = "0.16"
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
@@ -175,7 +176,6 @@ signal-hook = "0.3"
smallvec = "1.11"
smol_str = { version = "0.2.0", features = ["serde"] }
socket2 = "0.5"
spki = "0.7.3"
strum = "0.26"
strum_macros = "0.26"
"subtle" = "2.5.0"

View File

@@ -89,7 +89,7 @@ RUN set -e \
--bin storage_broker \
--bin storage_controller \
--bin proxy \
--bin endpoint_storage \
--bin object_storage \
--bin neon_local \
--bin storage_scrubber \
--locked --release
@@ -122,7 +122,7 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_controller /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/endpoint_storage /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/object_storage /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/neon_local /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_scrubber /usr/local/bin

View File

@@ -270,7 +270,7 @@ By default, this runs both debug and release modes, and all supported postgres v
testing locally, it is convenient to run just one set of permutations, like this:
```sh
DEFAULT_PG_VERSION=17 BUILD_TYPE=release ./scripts/pytest
DEFAULT_PG_VERSION=16 BUILD_TYPE=release ./scripts/pytest
```
## Flamegraphs

View File

@@ -12,5 +12,3 @@ disallowed-macros = [
# cannot disallow this, because clippy finds used from tokio macros
#"tokio::pin",
]
allow-unwrap-in-tests = true

View File

@@ -1677,7 +1677,7 @@ RUN set -e \
&& apt clean && rm -rf /var/lib/apt/lists/*
# Use `dist_man_MANS=` to skip manpage generation (which requires python3/pandoc)
ENV PGBOUNCER_TAG=pgbouncer_1_24_1
ENV PGBOUNCER_TAG=pgbouncer_1_22_1
RUN set -e \
&& git clone --recurse-submodules --depth 1 --branch ${PGBOUNCER_TAG} https://github.com/pgbouncer/pgbouncer.git pgbouncer \
&& cd pgbouncer \

View File

@@ -0,0 +1,265 @@
commit 00aa659afc9c7336ab81036edec3017168aabf40
Author: Heikki Linnakangas <heikki@neon.tech>
Date: Tue Nov 12 16:59:19 2024 +0200
Temporarily disable test that depends on timezone
diff --git a/tests/expected/generalization.out b/tests/expected/generalization.out
index 23ef5fa..9e60deb 100644
--- a/ext-src/pg_anon-src/tests/expected/generalization.out
+++ b/ext-src/pg_anon-src/tests/expected/generalization.out
@@ -284,12 +284,9 @@ SELECT anon.generalize_tstzrange('19041107','century');
["Tue Jan 01 00:00:00 1901 PST","Mon Jan 01 00:00:00 2001 PST")
(1 row)
-SELECT anon.generalize_tstzrange('19041107','millennium');
- generalize_tstzrange
------------------------------------------------------------------
- ["Thu Jan 01 00:00:00 1001 PST","Mon Jan 01 00:00:00 2001 PST")
-(1 row)
-
+-- temporarily disabled, see:
+-- https://gitlab.com/dalibo/postgresql_anonymizer/-/commit/199f0a392b37c59d92ae441fb8f037e094a11a52#note_2148017485
+--SELECT anon.generalize_tstzrange('19041107','millennium');
-- generalize_daterange
SELECT anon.generalize_daterange('19041107');
generalize_daterange
diff --git a/tests/sql/generalization.sql b/tests/sql/generalization.sql
index b868344..b4fc977 100644
--- a/ext-src/pg_anon-src/tests/sql/generalization.sql
+++ b/ext-src/pg_anon-src/tests/sql/generalization.sql
@@ -61,7 +61,9 @@ SELECT anon.generalize_tstzrange('19041107','month');
SELECT anon.generalize_tstzrange('19041107','year');
SELECT anon.generalize_tstzrange('19041107','decade');
SELECT anon.generalize_tstzrange('19041107','century');
-SELECT anon.generalize_tstzrange('19041107','millennium');
+-- temporarily disabled, see:
+-- https://gitlab.com/dalibo/postgresql_anonymizer/-/commit/199f0a392b37c59d92ae441fb8f037e094a11a52#note_2148017485
+--SELECT anon.generalize_tstzrange('19041107','millennium');
-- generalize_daterange
SELECT anon.generalize_daterange('19041107');
commit 7dd414ee75f2875cffb1d6ba474df1f135a6fc6f
Author: Alexey Masterov <alexeymasterov@neon.tech>
Date: Fri May 31 06:34:26 2024 +0000
These alternative expected files were added to consider the neon features
diff --git a/ext-src/pg_anon-src/tests/expected/permissions_masked_role_1.out b/ext-src/pg_anon-src/tests/expected/permissions_masked_role_1.out
new file mode 100644
index 0000000..2539cfd
--- /dev/null
+++ b/ext-src/pg_anon-src/tests/expected/permissions_masked_role_1.out
@@ -0,0 +1,101 @@
+BEGIN;
+CREATE EXTENSION anon CASCADE;
+NOTICE: installing required extension "pgcrypto"
+SELECT anon.init();
+ init
+------
+ t
+(1 row)
+
+CREATE ROLE mallory_the_masked_user;
+SECURITY LABEL FOR anon ON ROLE mallory_the_masked_user IS 'MASKED';
+CREATE TABLE t1(i INT);
+ALTER TABLE t1 ADD COLUMN t TEXT;
+SECURITY LABEL FOR anon ON COLUMN t1.t
+IS 'MASKED WITH VALUE NULL';
+INSERT INTO t1 VALUES (1,'test');
+--
+-- We're checking the owner's permissions
+--
+-- see
+-- https://postgresql-anonymizer.readthedocs.io/en/latest/SECURITY/#permissions
+--
+SET ROLE mallory_the_masked_user;
+SELECT anon.pseudo_first_name(0) IS NOT NULL;
+ ?column?
+----------
+ t
+(1 row)
+
+-- SHOULD FAIL
+DO $$
+BEGIN
+ PERFORM anon.init();
+ EXCEPTION WHEN insufficient_privilege
+ THEN RAISE NOTICE 'insufficient_privilege';
+END$$;
+NOTICE: insufficient_privilege
+-- SHOULD FAIL
+DO $$
+BEGIN
+ PERFORM anon.anonymize_table('t1');
+ EXCEPTION WHEN insufficient_privilege
+ THEN RAISE NOTICE 'insufficient_privilege';
+END$$;
+NOTICE: insufficient_privilege
+-- SHOULD FAIL
+SAVEPOINT fail_start_engine;
+SELECT anon.start_dynamic_masking();
+ERROR: Only supersusers can start the dynamic masking engine.
+CONTEXT: PL/pgSQL function anon.start_dynamic_masking(boolean) line 18 at RAISE
+ROLLBACK TO fail_start_engine;
+RESET ROLE;
+SELECT anon.start_dynamic_masking();
+ start_dynamic_masking
+-----------------------
+ t
+(1 row)
+
+SET ROLE mallory_the_masked_user;
+SELECT * FROM mask.t1;
+ i | t
+---+---
+ 1 |
+(1 row)
+
+-- SHOULD FAIL
+DO $$
+BEGIN
+ SELECT * FROM public.t1;
+ EXCEPTION WHEN insufficient_privilege
+ THEN RAISE NOTICE 'insufficient_privilege';
+END$$;
+NOTICE: insufficient_privilege
+-- SHOULD FAIL
+SAVEPOINT fail_stop_engine;
+SELECT anon.stop_dynamic_masking();
+ERROR: Only supersusers can stop the dynamic masking engine.
+CONTEXT: PL/pgSQL function anon.stop_dynamic_masking() line 18 at RAISE
+ROLLBACK TO fail_stop_engine;
+RESET ROLE;
+SELECT anon.stop_dynamic_masking();
+NOTICE: The previous priviledges of 'mallory_the_masked_user' are not restored. You need to grant them manually.
+ stop_dynamic_masking
+----------------------
+ t
+(1 row)
+
+SET ROLE mallory_the_masked_user;
+SELECT COUNT(*)=1 FROM anon.pg_masking_rules;
+ ?column?
+----------
+ t
+(1 row)
+
+-- SHOULD FAIL
+SAVEPOINT fail_seclabel_on_role;
+SECURITY LABEL FOR anon ON ROLE mallory_the_masked_user IS NULL;
+ERROR: permission denied
+DETAIL: The current user must have the CREATEROLE attribute.
+ROLLBACK TO fail_seclabel_on_role;
+ROLLBACK;
diff --git a/ext-src/pg_anon-src/tests/expected/permissions_owner_1.out b/ext-src/pg_anon-src/tests/expected/permissions_owner_1.out
new file mode 100644
index 0000000..8b090fe
--- /dev/null
+++ b/ext-src/pg_anon-src/tests/expected/permissions_owner_1.out
@@ -0,0 +1,104 @@
+BEGIN;
+CREATE EXTENSION anon CASCADE;
+NOTICE: installing required extension "pgcrypto"
+SELECT anon.init();
+ init
+------
+ t
+(1 row)
+
+CREATE ROLE oscar_the_owner;
+ALTER DATABASE :DBNAME OWNER TO oscar_the_owner;
+CREATE ROLE mallory_the_masked_user;
+SECURITY LABEL FOR anon ON ROLE mallory_the_masked_user IS 'MASKED';
+--
+-- We're checking the owner's permissions
+--
+-- see
+-- https://postgresql-anonymizer.readthedocs.io/en/latest/SECURITY/#permissions
+--
+SET ROLE oscar_the_owner;
+SELECT anon.pseudo_first_name(0) IS NOT NULL;
+ ?column?
+----------
+ t
+(1 row)
+
+-- SHOULD FAIL
+DO $$
+BEGIN
+ PERFORM anon.init();
+ EXCEPTION WHEN insufficient_privilege
+ THEN RAISE NOTICE 'insufficient_privilege';
+END$$;
+NOTICE: insufficient_privilege
+CREATE TABLE t1(i INT);
+ALTER TABLE t1 ADD COLUMN t TEXT;
+SECURITY LABEL FOR anon ON COLUMN t1.t
+IS 'MASKED WITH VALUE NULL';
+INSERT INTO t1 VALUES (1,'test');
+SELECT anon.anonymize_table('t1');
+ anonymize_table
+-----------------
+ t
+(1 row)
+
+SELECT * FROM t1;
+ i | t
+---+---
+ 1 |
+(1 row)
+
+UPDATE t1 SET t='test' WHERE i=1;
+-- SHOULD FAIL
+SAVEPOINT fail_start_engine;
+SELECT anon.start_dynamic_masking();
+ start_dynamic_masking
+-----------------------
+ t
+(1 row)
+
+ROLLBACK TO fail_start_engine;
+RESET ROLE;
+SELECT anon.start_dynamic_masking();
+ start_dynamic_masking
+-----------------------
+ t
+(1 row)
+
+SET ROLE oscar_the_owner;
+SELECT * FROM t1;
+ i | t
+---+------
+ 1 | test
+(1 row)
+
+--SELECT * FROM mask.t1;
+-- SHOULD FAIL
+SAVEPOINT fail_stop_engine;
+SELECT anon.stop_dynamic_masking();
+ERROR: permission denied for schema mask
+CONTEXT: SQL statement "DROP VIEW mask.t1;"
+PL/pgSQL function anon.mask_drop_view(oid) line 3 at EXECUTE
+SQL statement "SELECT anon.mask_drop_view(oid)
+ FROM pg_catalog.pg_class
+ WHERE relnamespace=quote_ident(pg_catalog.current_setting('anon.sourceschema'))::REGNAMESPACE
+ AND relkind IN ('r','p','f')"
+PL/pgSQL function anon.stop_dynamic_masking() line 22 at PERFORM
+ROLLBACK TO fail_stop_engine;
+RESET ROLE;
+SELECT anon.stop_dynamic_masking();
+NOTICE: The previous priviledges of 'mallory_the_masked_user' are not restored. You need to grant them manually.
+ stop_dynamic_masking
+----------------------
+ t
+(1 row)
+
+SET ROLE oscar_the_owner;
+-- SHOULD FAIL
+SAVEPOINT fail_seclabel_on_role;
+SECURITY LABEL FOR anon ON ROLE mallory_the_masked_user IS NULL;
+ERROR: permission denied
+DETAIL: The current user must have the CREATEROLE attribute.
+ROLLBACK TO fail_seclabel_on_role;
+ROLLBACK;

View File

@@ -11,14 +11,6 @@ index bf6edcb..89b4c7f 100644
USE_PGXS = 1 # use pgxs if not in contrib directory
PGXS := $(shell $(PG_CONFIG) --pgxs)
diff --git a/regress/expected/init-extension.out b/regress/expected/init-extension.out
index 9f2e171..f6e4f8d 100644
--- a/regress/expected/init-extension.out
+++ b/regress/expected/init-extension.out
@@ -1,3 +1,2 @@
SET client_min_messages = warning;
CREATE EXTENSION pg_repack;
-RESET client_min_messages;
diff --git a/regress/expected/nosuper.out b/regress/expected/nosuper.out
index 8d0a94e..63b68bf 100644
--- a/regress/expected/nosuper.out
@@ -50,14 +42,6 @@ index 8d0a94e..63b68bf 100644
INFO: repacking table "public.tbl_cluster"
ERROR: query failed: ERROR: current transaction is aborted, commands ignored until end of transaction block
DETAIL: query was: RESET lock_timeout
diff --git a/regress/sql/init-extension.sql b/regress/sql/init-extension.sql
index 9f2e171..f6e4f8d 100644
--- a/regress/sql/init-extension.sql
+++ b/regress/sql/init-extension.sql
@@ -1,3 +1,2 @@
SET client_min_messages = warning;
CREATE EXTENSION pg_repack;
-RESET client_min_messages;
diff --git a/regress/sql/nosuper.sql b/regress/sql/nosuper.sql
index 072f0fa..dbe60f8 100644
--- a/regress/sql/nosuper.sql

View File

@@ -15,7 +15,7 @@ index 7a4b88c..56678af 100644
HEADERS = src/halfvec.h src/sparsevec.h src/vector.h
diff --git a/src/hnswbuild.c b/src/hnswbuild.c
index b667478..1298aa1 100644
index b667478..dc95d89 100644
--- a/src/hnswbuild.c
+++ b/src/hnswbuild.c
@@ -843,9 +843,17 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc)
@@ -36,7 +36,7 @@ index b667478..1298aa1 100644
/* Close relations within worker */
index_close(indexRel, indexLockmode);
table_close(heapRel, heapLockmode);
@@ -1100,13 +1108,25 @@ BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo,
@@ -1100,12 +1108,39 @@ BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo,
SeedRandom(42);
#endif
@@ -48,17 +48,32 @@ index b667478..1298aa1 100644
BuildGraph(buildstate, forkNum);
- if (RelationNeedsWAL(index) || forkNum == INIT_FORKNUM)
+#ifdef NEON_SMGR
+ smgr_finish_unlogged_build_phase_1(RelationGetSmgr(index));
+#endif
+
if (RelationNeedsWAL(index) || forkNum == INIT_FORKNUM)
+ if (RelationNeedsWAL(index) || forkNum == INIT_FORKNUM) {
log_newpage_range(index, forkNum, 0, RelationGetNumberOfBlocksInFork(index, forkNum), true);
+#ifdef NEON_SMGR
+ {
+#if PG_VERSION_NUM >= 160000
+ RelFileLocator rlocator = RelationGetSmgr(index)->smgr_rlocator.locator;
+#else
+ RelFileNode rlocator = RelationGetSmgr(index)->smgr_rnode.node;
+#endif
+ if (set_lwlsn_block_range_hook)
+ set_lwlsn_block_range_hook(XactLastRecEnd, rlocator,
+ MAIN_FORKNUM, 0, RelationGetNumberOfBlocks(index));
+ if (set_lwlsn_relation_hook)
+ set_lwlsn_relation_hook(XactLastRecEnd, rlocator, MAIN_FORKNUM);
+ }
+#endif
+ }
+
+#ifdef NEON_SMGR
+ smgr_end_unlogged_build(RelationGetSmgr(index));
+#endif
+
FreeBuildState(buildstate);
}

View File

@@ -1,5 +1,5 @@
diff --git a/src/ruminsert.c b/src/ruminsert.c
index 255e616..1c6edb7 100644
index 255e616..7a2240f 100644
--- a/src/ruminsert.c
+++ b/src/ruminsert.c
@@ -628,6 +628,10 @@ rumbuild(Relation heap, Relation index, struct IndexInfo *indexInfo)
@@ -24,12 +24,24 @@ index 255e616..1c6edb7 100644
/*
* Write index to xlog
*/
@@ -713,6 +721,10 @@ rumbuild(Relation heap, Relation index, struct IndexInfo *indexInfo)
@@ -713,6 +721,22 @@ rumbuild(Relation heap, Relation index, struct IndexInfo *indexInfo)
UnlockReleaseBuffer(buffer);
}
+#ifdef NEON_SMGR
+ smgr_end_unlogged_build(index->rd_smgr);
+ {
+#if PG_VERSION_NUM >= 160000
+ RelFileLocator rlocator = RelationGetSmgr(index)->smgr_rlocator.locator;
+#else
+ RelFileNode rlocator = RelationGetSmgr(index)->smgr_rnode.node;
+#endif
+ if (set_lwlsn_block_range_hook)
+ set_lwlsn_block_range_hook(XactLastRecEnd, rlocator, MAIN_FORKNUM, 0, RelationGetNumberOfBlocks(index));
+ if (set_lwlsn_relation_hook)
+ set_lwlsn_relation_hook(XactLastRecEnd, rlocator, MAIN_FORKNUM);
+
+ smgr_end_unlogged_build(index->rd_smgr);
+ }
+#endif
+
/*

View File

@@ -22,7 +22,7 @@ commands:
- name: local_proxy
user: postgres
sysvInitAction: respawn
shell: 'RUST_LOG="info,proxy::serverless::sql_over_http=warn" /usr/local/bin/local_proxy --config-path /etc/local_proxy/config.json --pid-path /etc/local_proxy/pid --http 0.0.0.0:10432'
shell: '/usr/local/bin/local_proxy --config-path /etc/local_proxy/config.json --pid-path /etc/local_proxy/pid --http 0.0.0.0:10432'
- name: postgres-exporter
user: nobody
sysvInitAction: respawn

View File

@@ -22,7 +22,7 @@ commands:
- name: local_proxy
user: postgres
sysvInitAction: respawn
shell: 'RUST_LOG="info,proxy::serverless::sql_over_http=warn" /usr/local/bin/local_proxy --config-path /etc/local_proxy/config.json --pid-path /etc/local_proxy/pid --http 0.0.0.0:10432'
shell: '/usr/local/bin/local_proxy --config-path /etc/local_proxy/config.json --pid-path /etc/local_proxy/pid --http 0.0.0.0:10432'
- name: postgres-exporter
user: nobody
sysvInitAction: respawn

View File

@@ -57,13 +57,24 @@ use tracing::{error, info};
use url::Url;
use utils::failpoint_support;
// Compatibility hack: if the control plane specified any remote-ext-config
// use the default value for extension storage proxy gateway.
// Remove this once the control plane is updated to pass the gateway URL
fn parse_remote_ext_config(arg: &str) -> Result<String> {
if arg.starts_with("http") {
Ok(arg.trim_end_matches('/').to_string())
} else {
Ok("http://pg-ext-s3-gateway".to_string())
}
}
#[derive(Parser)]
#[command(rename_all = "kebab-case")]
struct Cli {
#[arg(short = 'b', long, default_value = "postgres", env = "POSTGRES_PATH")]
pub pgbin: String,
#[arg(short = 'r', long)]
#[arg(short = 'r', long, value_parser = parse_remote_ext_config)]
pub remote_ext_config: Option<String>,
/// The port to bind the external listening HTTP server to. Clients running
@@ -105,7 +116,9 @@ struct Cli {
#[arg(long)]
pub set_disk_quota_for_fs: Option<String>,
#[arg(short = 'c', long)]
// TODO(tristan957): remove alias after compatibility tests are no longer
// an issue
#[arg(short = 'c', long, alias = "spec-path")]
pub config: Option<OsString>,
#[arg(short = 'i', long, group = "compute-id")]

View File

@@ -641,26 +641,7 @@ impl ComputeNode {
let log_directory_path = Path::new(&self.params.pgdata).join("log");
let log_directory_path = log_directory_path.to_string_lossy().to_string();
// Add project_id,endpoint_id tag to identify the logs.
//
// These ids are passed from cplane,
// for backwards compatibility (old computes that don't have them),
// we set them to None.
// TODO: Clean up this code when all computes have them.
let tag: Option<String> = match (
pspec.spec.project_id.as_deref(),
pspec.spec.endpoint_id.as_deref(),
) {
(Some(project_id), Some(endpoint_id)) => {
Some(format!("{project_id}/{endpoint_id}"))
}
(Some(project_id), None) => Some(format!("{project_id}/None")),
(None, Some(endpoint_id)) => Some(format!("None,{endpoint_id}")),
(None, None) => None,
};
configure_audit_rsyslog(log_directory_path.clone(), tag, &remote_endpoint)?;
configure_audit_rsyslog(log_directory_path.clone(), "hipaa", &remote_endpoint)?;
// Launch a background task to clean up the audit logs
launch_pgaudit_gc(log_directory_path);
@@ -916,32 +897,28 @@ impl ComputeNode {
let mut client = config.connect(NoTls)?;
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
let basebackup_cmd = match lsn {
Lsn(0) => {
if spec.spec.mode != ComputeMode::Primary {
format!(
"basebackup {} {} --gzip --replica",
spec.tenant_id, spec.timeline_id
)
} else {
format!("basebackup {} {} --gzip", spec.tenant_id, spec.timeline_id)
}
}
_ => {
if spec.spec.mode != ComputeMode::Primary {
format!(
"basebackup {} {} {} --gzip --replica",
spec.tenant_id, spec.timeline_id, lsn
)
} else {
format!(
"basebackup {} {} {} --gzip",
spec.tenant_id, spec.timeline_id, lsn
)
}
}
};
let tenant_id = spec.tenant_id.to_string();
let timeline_id = spec.timeline_id.to_string();
let lsn_str = lsn.to_string();
let mut cmd = Vec::new();
cmd.push("basebackup");
cmd.push(&tenant_id);
cmd.push(&timeline_id);
if lsn != Lsn::INVALID {
cmd.push(&lsn_str);
}
cmd.push("--gzip");
if spec.spec.mode != ComputeMode::Primary {
cmd.push("--replica");
}
if spec
.spec
.features
.contains(&ComputeFeature::LazySlruDownload)
{
cmd.push("--lazy-slru-download")
}
let basebackup_cmd = cmd.join(" ");
let copyreader = client.copy_out(basebackup_cmd.as_str())?;
let mut measured_reader = MeasuredReader::new(copyreader);
let mut bufreader = std::io::BufReader::new(&mut measured_reader);

View File

@@ -6,5 +6,4 @@ pub(crate) mod request_id;
pub(crate) use json::Json;
pub(crate) use path::Path;
pub(crate) use query::Query;
#[allow(unused)]
pub(crate) use request_id::RequestId;

View File

@@ -1,7 +1,7 @@
use std::collections::HashSet;
use std::{collections::HashSet, net::SocketAddr};
use anyhow::{Result, anyhow};
use axum::{RequestExt, body::Body};
use axum::{RequestExt, body::Body, extract::ConnectInfo};
use axum_extra::{
TypedHeader,
headers::{Authorization, authorization::Bearer},
@@ -13,7 +13,7 @@ use jsonwebtoken::{Algorithm, DecodingKey, TokenData, Validation, jwk::JwkSet};
use tower_http::auth::AsyncAuthorizeRequest;
use tracing::{debug, warn};
use crate::http::JsonResponse;
use crate::http::{JsonResponse, extract::RequestId};
#[derive(Clone, Debug)]
pub(in crate::http) struct Authorize {
@@ -52,6 +52,31 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
let validation = self.validation.clone();
Box::pin(async move {
let request_id = request.extract_parts::<RequestId>().await.unwrap();
// TODO(tristan957): Remove this stanza after teaching neon_local
// and the regression tests to use a JWT + JWKS.
//
// https://github.com/neondatabase/neon/issues/11316
if cfg!(feature = "testing") {
warn!(%request_id, "Skipping compute_ctl authorization check");
return Ok(request);
}
let connect_info = request
.extract_parts::<ConnectInfo<SocketAddr>>()
.await
.unwrap();
// In the event the request is coming from the loopback interface,
// allow all requests
if connect_info.ip().is_loopback() {
warn!(%request_id, "Bypassed authorization because request is coming from the loopback interface");
return Ok(request);
}
let TypedHeader(Authorization(bearer)) = request
.extract_parts::<TypedHeader<Authorization<Bearer>>>()
.await

View File

@@ -50,13 +50,13 @@ fn restart_rsyslog() -> Result<()> {
pub fn configure_audit_rsyslog(
log_directory: String,
tag: Option<String>,
tag: &str,
remote_endpoint: &str,
) -> Result<()> {
let config_content: String = format!(
include_str!("config_template/compute_audit_rsyslog_template.conf"),
log_directory = log_directory,
tag = tag.unwrap_or("".to_string()),
tag = tag,
remote_endpoint = remote_endpoint
);

View File

@@ -16,6 +16,7 @@ jsonwebtoken.workspace = true
nix.workspace = true
once_cell.workspace = true
pem.workspace = true
pkcs8.workspace = true
humantime-serde.workspace = true
hyper0.workspace = true
regex.workspace = true
@@ -24,7 +25,6 @@ scopeguard.workspace = true
serde.workspace = true
serde_json.workspace = true
sha2.workspace = true
spki.workspace = true
thiserror.workspace = true
toml.workspace = true
toml_edit.workspace = true

View File

@@ -18,11 +18,12 @@ use anyhow::{Context, Result, anyhow, bail};
use clap::Parser;
use compute_api::spec::ComputeMode;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_PORT, EndpointStorage};
use control_plane::local_env::{
EndpointStorageConf, InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf,
NeonLocalInitPageserverConf, SafekeeperConf,
InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf, NeonLocalInitPageserverConf,
ObjectStorageConf, SafekeeperConf,
};
use control_plane::object_storage::OBJECT_STORAGE_DEFAULT_PORT;
use control_plane::object_storage::ObjectStorage;
use control_plane::pageserver::PageServerNode;
use control_plane::safekeeper::SafekeeperNode;
use control_plane::storage_controller::{
@@ -62,7 +63,7 @@ const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
const DEFAULT_BRANCH_NAME: &str = "main";
project_git_version!(GIT_VERSION);
const DEFAULT_PG_VERSION: u32 = 17;
const DEFAULT_PG_VERSION: u32 = 16;
const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/upcall/v1/";
@@ -92,7 +93,7 @@ enum NeonLocalCmd {
#[command(subcommand)]
Safekeeper(SafekeeperCmd),
#[command(subcommand)]
EndpointStorage(EndpointStorageCmd),
ObjectStorage(ObjectStorageCmd),
#[command(subcommand)]
Endpoint(EndpointCmd),
#[command(subcommand)]
@@ -459,14 +460,14 @@ enum SafekeeperCmd {
#[derive(clap::Subcommand)]
#[clap(about = "Manage object storage")]
enum EndpointStorageCmd {
Start(EndpointStorageStartCmd),
Stop(EndpointStorageStopCmd),
enum ObjectStorageCmd {
Start(ObjectStorageStartCmd),
Stop(ObjectStorageStopCmd),
}
#[derive(clap::Args)]
#[clap(about = "Start object storage")]
struct EndpointStorageStartCmd {
struct ObjectStorageStartCmd {
#[clap(short = 't', long, help = "timeout until we fail the command")]
#[arg(default_value = "10s")]
start_timeout: humantime::Duration,
@@ -474,7 +475,7 @@ struct EndpointStorageStartCmd {
#[derive(clap::Args)]
#[clap(about = "Stop object storage")]
struct EndpointStorageStopCmd {
struct ObjectStorageStopCmd {
#[arg(value_enum, default_value = "fast")]
#[clap(
short = 'm',
@@ -796,9 +797,7 @@ fn main() -> Result<()> {
}
NeonLocalCmd::StorageBroker(subcmd) => rt.block_on(handle_storage_broker(&subcmd, env)),
NeonLocalCmd::Safekeeper(subcmd) => rt.block_on(handle_safekeeper(&subcmd, env)),
NeonLocalCmd::EndpointStorage(subcmd) => {
rt.block_on(handle_endpoint_storage(&subcmd, env))
}
NeonLocalCmd::ObjectStorage(subcmd) => rt.block_on(handle_object_storage(&subcmd, env)),
NeonLocalCmd::Endpoint(subcmd) => rt.block_on(handle_endpoint(&subcmd, env)),
NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env),
};
@@ -1015,8 +1014,8 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
}
})
.collect(),
endpoint_storage: EndpointStorageConf {
port: ENDPOINT_STORAGE_DEFAULT_PORT,
object_storage: ObjectStorageConf {
port: OBJECT_STORAGE_DEFAULT_PORT,
},
pg_distrib_dir: None,
neon_distrib_dir: None,
@@ -1545,7 +1544,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
let jwt = endpoint.generate_jwt()?;
print!("{jwt}");
println!("{jwt}");
}
}
@@ -1736,15 +1735,12 @@ async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) ->
Ok(())
}
async fn handle_endpoint_storage(
subcmd: &EndpointStorageCmd,
env: &local_env::LocalEnv,
) -> Result<()> {
use EndpointStorageCmd::*;
let storage = EndpointStorage::from_env(env);
async fn handle_object_storage(subcmd: &ObjectStorageCmd, env: &local_env::LocalEnv) -> Result<()> {
use ObjectStorageCmd::*;
let storage = ObjectStorage::from_env(env);
// In tests like test_forward_compatibility or test_graceful_cluster_restart
// old neon binaries (without endpoint_storage) are present
// old neon binaries (without object_storage) are present
if !storage.bin.exists() {
eprintln!(
"{} binary not found. Ignore if this is a compatibility test",
@@ -1754,13 +1750,13 @@ async fn handle_endpoint_storage(
}
match subcmd {
Start(EndpointStorageStartCmd { start_timeout }) => {
Start(ObjectStorageStartCmd { start_timeout }) => {
if let Err(e) = storage.start(start_timeout).await {
eprintln!("endpoint_storage start failed: {e}");
eprintln!("object_storage start failed: {e}");
exit(1);
}
}
Stop(EndpointStorageStopCmd { stop_mode }) => {
Stop(ObjectStorageStopCmd { stop_mode }) => {
let immediate = match stop_mode {
StopMode::Fast => false,
StopMode::Immediate => true,
@@ -1870,10 +1866,10 @@ async fn handle_start_all_impl(
}
js.spawn(async move {
EndpointStorage::from_env(env)
ObjectStorage::from_env(env)
.start(&retry_timeout)
.await
.map_err(|e| e.context("start endpoint_storage"))
.map_err(|e| e.context("start object_storage"))
});
})();
@@ -1972,9 +1968,9 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
}
}
let storage = EndpointStorage::from_env(env);
let storage = ObjectStorage::from_env(env);
if let Err(e) = storage.stop(immediate) {
eprintln!("endpoint_storage stop failed: {:#}", e);
eprintln!("object_storage stop failed: {:#}", e);
}
for ps_conf in &env.pageservers {

View File

@@ -60,12 +60,11 @@ use jsonwebtoken::jwk::{
use nix::sys::signal::{Signal, kill};
use pageserver_api::shard::ShardStripeSize;
use pem::Pem;
use pkcs8::der::Decode;
use reqwest::header::CONTENT_TYPE;
use safekeeper_api::membership::SafekeeperGeneration;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use spki::der::Decode;
use spki::{SubjectPublicKeyInfo, SubjectPublicKeyInfoRef};
use tracing::debug;
use url::Host;
use utils::id::{NodeId, TenantId, TimelineId};
@@ -148,12 +147,11 @@ impl ComputeControlPlane {
/// Create a JSON Web Key Set. This ideally matches the way we create a JWKS
/// from the production control plane.
fn create_jwks_from_pem(pem: &Pem) -> Result<JwkSet> {
let spki: SubjectPublicKeyInfoRef = SubjectPublicKeyInfo::from_der(pem.contents())?;
let public_key = spki.subject_public_key.raw_bytes();
fn create_jwks_from_pem(pem: Pem) -> Result<JwkSet> {
let document = pkcs8::Document::from_der(&pem.into_contents())?;
let mut hasher = Sha256::new();
hasher.update(public_key);
hasher.update(&document);
let key_hash = hasher.finalize();
Ok(JwkSet {
@@ -171,7 +169,7 @@ impl ComputeControlPlane {
algorithm: AlgorithmParameters::OctetKeyPair(OctetKeyPairParameters {
key_type: OctetKeyPairType::OctetKeyPair,
curve: EllipticCurve::Ed25519,
x: base64::encode_config(public_key, base64::URL_SAFE_NO_PAD),
x: base64::encode_config(&document, base64::URL_SAFE_NO_PAD),
}),
}],
})
@@ -195,7 +193,7 @@ impl ComputeControlPlane {
let external_http_port = external_http_port.unwrap_or_else(|| self.get_port() + 1);
let internal_http_port = internal_http_port.unwrap_or_else(|| external_http_port + 1);
let compute_ctl_config = ComputeCtlConfig {
jwks: Self::create_jwks_from_pem(&self.env.read_public_key()?)?,
jwks: Self::create_jwks_from_pem(self.env.read_public_key()?)?,
tls: None::<TlsConfig>,
};
let ep = Arc::new(Endpoint {
@@ -766,6 +764,10 @@ impl Endpoint {
}
};
// TODO(tristan957): Remove the write to spec.json after compatibility
// tests work themselves out
let spec_path = self.endpoint_path().join("spec.json");
std::fs::write(spec_path, serde_json::to_string_pretty(&config.spec)?)?;
let config_path = self.endpoint_path().join("config.json");
std::fs::write(config_path, serde_json::to_string_pretty(&config)?)?;
@@ -775,6 +777,16 @@ impl Endpoint {
.append(true)
.open(self.endpoint_path().join("compute.log"))?;
// TODO(tristan957): Remove when compatibility tests are no longer an
// issue
let old_compute_ctl = {
let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
let help_output = cmd.arg("--help").output()?;
let help_output = String::from_utf8_lossy(&help_output.stdout);
!help_output.contains("--config")
};
// Launch compute_ctl
let conn_str = self.connstr("cloud_admin", "postgres");
println!("Starting postgres node at '{}'", conn_str);
@@ -793,8 +805,19 @@ impl Endpoint {
])
.args(["--pgdata", self.pgdata().to_str().unwrap()])
.args(["--connstr", &conn_str])
.arg("--config")
.arg(self.endpoint_path().join("config.json").as_os_str())
// TODO(tristan957): Change this to --config when compatibility tests
// are no longer an issue
.args([
"--spec-path",
self.endpoint_path()
.join(if old_compute_ctl {
"spec.json"
} else {
"config.json"
})
.to_str()
.unwrap(),
])
.args([
"--pgbin",
self.env

View File

@@ -9,8 +9,8 @@
mod background_process;
pub mod broker;
pub mod endpoint;
pub mod endpoint_storage;
pub mod local_env;
pub mod object_storage;
pub mod pageserver;
pub mod postgresql_conf;
pub mod safekeeper;

View File

@@ -19,11 +19,11 @@ use serde::{Deserialize, Serialize};
use utils::auth::encode_from_key_file;
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use crate::endpoint_storage::{ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage};
use crate::object_storage::{OBJECT_STORAGE_REMOTE_STORAGE_DIR, ObjectStorage};
use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
use crate::safekeeper::SafekeeperNode;
pub const DEFAULT_PG_VERSION: u32 = 17;
pub const DEFAULT_PG_VERSION: u32 = 16;
//
// This data structures represents neon_local CLI config
@@ -72,7 +72,7 @@ pub struct LocalEnv {
pub safekeepers: Vec<SafekeeperConf>,
pub endpoint_storage: EndpointStorageConf,
pub object_storage: ObjectStorageConf,
// Control plane upcall API for pageserver: if None, we will not run storage_controller If set, this will
// be propagated into each pageserver's configuration.
@@ -110,7 +110,7 @@ pub struct OnDiskConfig {
)]
pub pageservers: Vec<PageServerConf>,
pub safekeepers: Vec<SafekeeperConf>,
pub endpoint_storage: EndpointStorageConf,
pub object_storage: ObjectStorageConf,
pub control_plane_api: Option<Url>,
pub control_plane_hooks_api: Option<Url>,
pub control_plane_compute_hook_api: Option<Url>,
@@ -144,7 +144,7 @@ pub struct NeonLocalInitConf {
pub storage_controller: Option<NeonStorageControllerConf>,
pub pageservers: Vec<NeonLocalInitPageserverConf>,
pub safekeepers: Vec<SafekeeperConf>,
pub endpoint_storage: EndpointStorageConf,
pub object_storage: ObjectStorageConf,
pub control_plane_api: Option<Url>,
pub control_plane_hooks_api: Option<Url>,
pub generate_local_ssl_certs: bool,
@@ -152,7 +152,7 @@ pub struct NeonLocalInitConf {
#[derive(Serialize, Default, Deserialize, PartialEq, Eq, Clone, Debug)]
#[serde(default)]
pub struct EndpointStorageConf {
pub struct ObjectStorageConf {
pub port: u16,
}
@@ -413,8 +413,8 @@ impl LocalEnv {
self.pg_dir(pg_version, "lib")
}
pub fn endpoint_storage_bin(&self) -> PathBuf {
self.neon_distrib_dir.join("endpoint_storage")
pub fn object_storage_bin(&self) -> PathBuf {
self.neon_distrib_dir.join("object_storage")
}
pub fn pageserver_bin(&self) -> PathBuf {
@@ -450,8 +450,8 @@ impl LocalEnv {
self.base_data_dir.join("safekeepers").join(data_dir_name)
}
pub fn endpoint_storage_data_dir(&self) -> PathBuf {
self.base_data_dir.join("endpoint_storage")
pub fn object_storage_data_dir(&self) -> PathBuf {
self.base_data_dir.join("object_storage")
}
pub fn get_pageserver_conf(&self, id: NodeId) -> anyhow::Result<&PageServerConf> {
@@ -615,7 +615,7 @@ impl LocalEnv {
control_plane_compute_hook_api: _,
branch_name_mappings,
generate_local_ssl_certs,
endpoint_storage,
object_storage,
} = on_disk_config;
LocalEnv {
base_data_dir: repopath.to_owned(),
@@ -632,7 +632,7 @@ impl LocalEnv {
control_plane_hooks_api,
branch_name_mappings,
generate_local_ssl_certs,
endpoint_storage,
object_storage,
}
};
@@ -742,7 +742,7 @@ impl LocalEnv {
control_plane_compute_hook_api: None,
branch_name_mappings: self.branch_name_mappings.clone(),
generate_local_ssl_certs: self.generate_local_ssl_certs,
endpoint_storage: self.endpoint_storage.clone(),
object_storage: self.object_storage.clone(),
},
)
}
@@ -849,7 +849,7 @@ impl LocalEnv {
control_plane_api,
generate_local_ssl_certs,
control_plane_hooks_api,
endpoint_storage,
object_storage,
} = conf;
// Find postgres binaries.
@@ -901,7 +901,7 @@ impl LocalEnv {
control_plane_hooks_api,
branch_name_mappings: Default::default(),
generate_local_ssl_certs,
endpoint_storage,
object_storage,
};
if generate_local_ssl_certs {
@@ -929,13 +929,13 @@ impl LocalEnv {
.context("pageserver init failed")?;
}
EndpointStorage::from_env(&env)
ObjectStorage::from_env(&env)
.init()
.context("object storage init failed")?;
// setup remote remote location for default LocalFs remote storage
std::fs::create_dir_all(env.base_data_dir.join(PAGESERVER_REMOTE_STORAGE_DIR))?;
std::fs::create_dir_all(env.base_data_dir.join(ENDPOINT_STORAGE_REMOTE_STORAGE_DIR))?;
std::fs::create_dir_all(env.base_data_dir.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR))?;
env.persist_config()
}

View File

@@ -1,33 +1,34 @@
use crate::background_process::{self, start_process, stop_process};
use crate::local_env::LocalEnv;
use anyhow::anyhow;
use anyhow::{Context, Result};
use camino::Utf8PathBuf;
use std::io::Write;
use std::time::Duration;
/// Directory within .neon which will be used by default for LocalFs remote storage.
pub const ENDPOINT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/endpoint_storage";
pub const ENDPOINT_STORAGE_DEFAULT_PORT: u16 = 9993;
pub const OBJECT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/object_storage";
pub const OBJECT_STORAGE_DEFAULT_PORT: u16 = 9993;
pub struct EndpointStorage {
pub struct ObjectStorage {
pub bin: Utf8PathBuf,
pub data_dir: Utf8PathBuf,
pub pemfile: Utf8PathBuf,
pub port: u16,
}
impl EndpointStorage {
pub fn from_env(env: &LocalEnv) -> EndpointStorage {
EndpointStorage {
bin: Utf8PathBuf::from_path_buf(env.endpoint_storage_bin()).unwrap(),
data_dir: Utf8PathBuf::from_path_buf(env.endpoint_storage_data_dir()).unwrap(),
impl ObjectStorage {
pub fn from_env(env: &LocalEnv) -> ObjectStorage {
ObjectStorage {
bin: Utf8PathBuf::from_path_buf(env.object_storage_bin()).unwrap(),
data_dir: Utf8PathBuf::from_path_buf(env.object_storage_data_dir()).unwrap(),
pemfile: Utf8PathBuf::from_path_buf(env.public_key_path.clone()).unwrap(),
port: env.endpoint_storage.port,
port: env.object_storage.port,
}
}
fn config_path(&self) -> Utf8PathBuf {
self.data_dir.join("endpoint_storage.json")
self.data_dir.join("object_storage.json")
}
fn listen_addr(&self) -> Utf8PathBuf {
@@ -48,7 +49,7 @@ impl EndpointStorage {
let cfg = Cfg {
listen: self.listen_addr(),
pemfile: parent.join(self.pemfile.clone()),
local_path: parent.join(ENDPOINT_STORAGE_REMOTE_STORAGE_DIR),
local_path: parent.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR),
r#type: "LocalFs".to_string(),
};
std::fs::create_dir_all(self.config_path().parent().unwrap())?;
@@ -58,19 +59,24 @@ impl EndpointStorage {
}
pub async fn start(&self, retry_timeout: &Duration) -> Result<()> {
println!("Starting endpoint_storage at {}", self.listen_addr());
println!("Starting s3 proxy at {}", self.listen_addr());
std::io::stdout().flush().context("flush stdout")?;
let process_status_check = || async {
let res = reqwest::Client::new().get(format!("http://{}/metrics", self.listen_addr()));
match res.send().await {
Ok(res) => Ok(res.status().is_success()),
Err(_) => Ok(false),
tokio::time::sleep(Duration::from_millis(500)).await;
let res = reqwest::Client::new()
.get(format!("http://{}/metrics", self.listen_addr()))
.send()
.await;
match res {
Ok(response) if response.status().is_success() => Ok(true),
Ok(_) => Err(anyhow!("Failed to query /metrics")),
Err(e) => Err(anyhow!("Failed to check node status: {e}")),
}
};
let res = start_process(
"endpoint_storage",
"object_storage",
&self.data_dir.clone().into_std_path_buf(),
&self.bin.clone().into_std_path_buf(),
vec![self.config_path().to_string()],
@@ -88,14 +94,14 @@ impl EndpointStorage {
}
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
stop_process(immediate, "endpoint_storage", &self.pid_file())
stop_process(immediate, "object_storage", &self.pid_file())
}
fn log_file(&self) -> Utf8PathBuf {
self.data_dir.join("endpoint_storage.log")
self.data_dir.join("object_storage.log")
}
fn pid_file(&self) -> Utf8PathBuf {
self.data_dir.join("endpoint_storage.pid")
self.data_dir.join("object_storage.pid")
}
}

View File

@@ -413,11 +413,6 @@ impl PageServerNode {
.map(serde_json::from_str)
.transpose()
.context("Failed to parse 'compaction_algorithm' json")?,
compaction_shard_ancestor: settings
.remove("compaction_shard_ancestor")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'compaction_shard_ancestor' as a bool")?,
compaction_l0_first: settings
.remove("compaction_l0_first")
.map(|x| x.parse::<bool>())

View File

@@ -1,3 +1,4 @@
# Example docker compose configuration
The configuration in this directory is used for testing Neon docker images: it is
@@ -7,13 +8,3 @@ you can experiment with a miniature Neon system, use `cargo neon` rather than co
This configuration does not start the storage controller, because the controller
needs a way to reconfigure running computes, and no such thing exists in this setup.
## Generating the JWKS for a compute
```shell
openssl genpkey -algorithm Ed25519 -out private-key.pem
openssl pkey -in private-key.pem -pubout -out public-key.pem
openssl pkey -pubin -inform pem -in public-key.pem -pubout -outform der -out public-key.der
key="$(xxd -plain -cols 32 -s -32 public-key.der)"
key_id="$(printf '%s' "$key" | sha256sum | awk '{ print $1 }' | basenc --base64url --wrap=0)"
x="$(printf '%s' "$key" | basenc --base64url --wrap=0)"
```

View File

@@ -1,3 +0,0 @@
-----BEGIN PRIVATE KEY-----
MC4CAQAwBQYDK2VwBCIEIOmnRbzt2AJ0d+S3aU1hiYOl/tXpvz1FmWBfwHYBgOma
-----END PRIVATE KEY-----

View File

@@ -1,3 +0,0 @@
-----BEGIN PUBLIC KEY-----
MCowBQYDK2VwAyEADY0al/U0bgB3+9fUGk+3PKWnsck9OyxN5DjHIN6Xep0=
-----END PUBLIC KEY-----

View File

@@ -81,9 +81,19 @@ sed -i "s/TIMELINE_ID/${timeline_id}/" ${CONFIG_FILE}
cat ${CONFIG_FILE}
# TODO(tristan957): Remove these workarounds for backwards compatibility after
# the next compute release. That includes these next few lines and the
# --spec-path in the compute_ctl invocation.
if compute_ctl --help | grep --quiet -- '--config'; then
SPEC_PATH="$CONFIG_FILE"
else
jq '.spec' < "$CONFIG_FILE" > /tmp/spec.json
SPEC_PATH=/tmp/spec.json
fi
echo "Start compute node"
/usr/local/bin/compute_ctl --pgdata /var/db/postgres/compute \
-C "postgresql://cloud_admin@localhost:55433/postgres" \
-b /usr/local/bin/postgres \
--compute-id "compute-$RANDOM" \
--config "$CONFIG_FILE"
--spec-path "$SPEC_PATH"

View File

@@ -142,19 +142,7 @@
},
"compute_ctl_config": {
"jwks": {
"keys": [
{
"use": "sig",
"key_ops": [
"verify"
],
"alg": "EdDSA",
"kid": "ZGIxMzAzOGY0YWQwODk2ODU1MTk1NzMxMDFkYmUyOWU2NzZkOWNjNjMyMGRkZGJjOWY0MjdjYWVmNzE1MjUyOAo=",
"kty": "OKP",
"crv": "Ed25519",
"x": "MGQ4ZDFhOTdmNTM0NmUwMDc3ZmJkN2Q0MWE0ZmI3M2NhNWE3YjFjOTNkM2IyYzRkZTQzOGM3MjBkZTk3N2E5ZAo="
}
]
"keys": []
}
}
}

View File

@@ -1,53 +0,0 @@
# Architecture Naming Scheme
## Summary
Neon computes are going to support multiple CPU architectures, including 64-bit
x86 and ARM. Architecture naming schemes are fairly inconsistent, at least when
it comes to these two architectures in particular. Sometimes 64-bit x86 is known
as `amd64` and at other times `x86_64`. For 64-bit ARM, it's a similar story
with `arm64` and `aarch64`.
## Motivation
Consistency when referring to these architectures across the platform in
important not only so that everyone is on the same page, but also because we
have architecture-dependent portions of the platform. One notable example is
remote extensions. Remote extensions need to be compiled and packaged into
compressed tarballs that are then downloaded by computes on demand. Downloading
a tarball for the wrong architecture will cause `dlopen(3)` to fail when
Postgres attempts to load the library. The compressed tarballs are located in an
S3 bucket with object keys of the form
`$BUILD_TAG/$ARCH/$PG_VERSION_NUM/${EXTENSION_NAME}.tar.zst` to mitigate the
potential for failure. The build and deployment pipeline for remote extensions
needs to be in lock step with the code in the compute that actually fetches
remote extensions.
## Impacted Components
- Control Plane when persisting the target architecture in compute flags if
specified.
- Austocaling when scheduling compute pods.
- `compute_ctl` when downloading remote extensions.
- Build and deployment pipeline for remote extensions.
## Prior Art for CPU Architecture Names
- [Rust](https://doc.rust-lang.org/std/env/consts/constant.ARCH.html)
- `x86_64`
- `aarch64`
- [Go](https://pkg.go.dev/internal/goarch#pkg-constants)
- `amd64`
- `arm64`
- Kubernetes
- Because Kubernetes is written in Go, it inherits the naming scheme.
Going all in on either the Rust naming scheme or the Go naming scheme can save
us a branch when getting the architecture, so we should pick one and force the
other side to conform.
## Decision
The heart of our platform is Kubernetes. We will inherit the Go naming
scheme just like Kubernetes. This makes things easy for the autoscaling team
when they do pod scheduling on nodes.

View File

@@ -183,6 +183,9 @@ pub enum ComputeFeature {
/// track short-lived connections as user activity.
ActivityMonitorExperimental,
/// Download all SLRU files on demand
LazySlruDownload,
/// This is a special feature flag that is used to represent unknown feature flags.
/// Basically all unknown to enum flags are represented as this one. See unit test
/// `parse_unknown_features()` for more details.
@@ -242,22 +245,13 @@ impl RemoteExtSpec {
match self.extension_data.get(real_ext_name) {
Some(_ext_data) => {
// We have decided to use the Go naming convention due to Kubernetes.
let arch = match std::env::consts::ARCH {
"x86_64" => "amd64",
"aarch64" => "arm64",
arch => arch,
};
// Construct the path to the extension archive
// BUILD_TAG/PG_MAJOR_VERSION/extensions/EXTENSION_NAME.tar.zst
//
// Keep it in sync with path generation in
// https://github.com/neondatabase/build-custom-extensions/tree/main
let archive_path_str = format!(
"{build_tag}/{arch}/{pg_major_version}/extensions/{real_ext_name}.tar.zst"
);
let archive_path_str =
format!("{build_tag}/{pg_major_version}/extensions/{real_ext_name}.tar.zst");
Ok((
real_ext_name.to_string(),
RemotePath::from_string(&archive_path_str)?,

View File

@@ -35,7 +35,6 @@ nix = {workspace = true, optional = true}
reqwest.workspace = true
rand.workspace = true
tracing-utils.workspace = true
once_cell.workspace = true
[dev-dependencies]
bincode.workspace = true

View File

@@ -379,8 +379,6 @@ pub struct TenantConfigToml {
/// size exceeds `compaction_upper_limit * checkpoint_distance`.
pub compaction_upper_limit: usize,
pub compaction_algorithm: crate::models::CompactionAlgorithmSettings,
/// If true, enable shard ancestor compaction (enabled by default).
pub compaction_shard_ancestor: bool,
/// If true, compact down L0 across all tenant timelines before doing regular compaction. L0
/// compaction must be responsive to avoid read amp during heavy ingestion. Defaults to true.
pub compaction_l0_first: bool,
@@ -679,13 +677,12 @@ pub mod tenant_conf_defaults {
pub const DEFAULT_COMPACTION_PERIOD: &str = "20 s";
pub const DEFAULT_COMPACTION_THRESHOLD: usize = 10;
pub const DEFAULT_COMPACTION_SHARD_ANCESTOR: bool = true;
// This value needs to be tuned to avoid OOM. We have 3/4*CPUs threads for L0 compaction, that's
// 3/4*8=6 on most of our pageservers. Compacting 10 layers requires a maximum of
// DEFAULT_CHECKPOINT_DISTANCE*10 memory, that's 2560MB. So with this config, we can get a maximum peak
// compaction usage of 15360MB.
pub const DEFAULT_COMPACTION_UPPER_LIMIT: usize = 10;
// 3/4*16=9 on most of our pageservers. Compacting 20 layers requires about 1 GB memory (could
// be reduced later by optimizing L0 hole calculation to avoid loading all keys into memory). So
// with this config, we can get a maximum peak compaction usage of 9 GB.
pub const DEFAULT_COMPACTION_UPPER_LIMIT: usize = 20;
// Enable L0 compaction pass and semaphore by default. L0 compaction must be responsive to avoid
// read amp.
pub const DEFAULT_COMPACTION_L0_FIRST: bool = true;
@@ -702,11 +699,8 @@ pub mod tenant_conf_defaults {
// Relevant: https://github.com/neondatabase/neon/issues/3394
pub const DEFAULT_GC_PERIOD: &str = "1 hr";
pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3;
// Currently, any value other than 0 will trigger image layer creation preemption immediately with L0 backpressure
// without looking at the exact number of L0 layers.
// It was expected to have the following behavior:
// > If there are more than threshold * compaction_threshold (that is 3 * 10 in the default config) L0 layers, image
// > layer creation will end immediately. Set to 0 to disable.
// If there are more than threshold * compaction_threshold (that is 3 * 10 in the default config) L0 layers, image
// layer creation will end immediately. Set to 0 to disable.
pub const DEFAULT_IMAGE_CREATION_PREEMPT_THRESHOLD: usize = 3;
pub const DEFAULT_PITR_INTERVAL: &str = "7 days";
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "10 seconds";
@@ -740,7 +734,6 @@ impl Default for TenantConfigToml {
compaction_algorithm: crate::models::CompactionAlgorithmSettings {
kind: DEFAULT_COMPACTION_ALGORITHM,
},
compaction_shard_ancestor: DEFAULT_COMPACTION_SHARD_ANCESTOR,
compaction_l0_first: DEFAULT_COMPACTION_L0_FIRST,
compaction_l0_semaphore: DEFAULT_COMPACTION_L0_SEMAPHORE,
l0_flush_delay_threshold: None,

View File

@@ -526,8 +526,6 @@ pub struct TenantConfigPatch {
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub compaction_algorithm: FieldPatch<CompactionAlgorithmSettings>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub compaction_shard_ancestor: FieldPatch<bool>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub compaction_l0_first: FieldPatch<bool>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub compaction_l0_semaphore: FieldPatch<bool>,
@@ -617,9 +615,6 @@ pub struct TenantConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub compaction_algorithm: Option<CompactionAlgorithmSettings>,
#[serde(skip_serializing_if = "Option::is_none")]
pub compaction_shard_ancestor: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub compaction_l0_first: Option<bool>,
@@ -729,7 +724,6 @@ impl TenantConfig {
mut compaction_threshold,
mut compaction_upper_limit,
mut compaction_algorithm,
mut compaction_shard_ancestor,
mut compaction_l0_first,
mut compaction_l0_semaphore,
mut l0_flush_delay_threshold,
@@ -778,9 +772,6 @@ impl TenantConfig {
.compaction_upper_limit
.apply(&mut compaction_upper_limit);
patch.compaction_algorithm.apply(&mut compaction_algorithm);
patch
.compaction_shard_ancestor
.apply(&mut compaction_shard_ancestor);
patch.compaction_l0_first.apply(&mut compaction_l0_first);
patch
.compaction_l0_semaphore
@@ -869,7 +860,6 @@ impl TenantConfig {
compaction_threshold,
compaction_upper_limit,
compaction_algorithm,
compaction_shard_ancestor,
compaction_l0_first,
compaction_l0_semaphore,
l0_flush_delay_threshold,
@@ -930,9 +920,6 @@ impl TenantConfig {
.as_ref()
.unwrap_or(&global_conf.compaction_algorithm)
.clone(),
compaction_shard_ancestor: self
.compaction_shard_ancestor
.unwrap_or(global_conf.compaction_shard_ancestor),
compaction_l0_first: self
.compaction_l0_first
.unwrap_or(global_conf.compaction_l0_first),
@@ -1817,34 +1804,8 @@ pub mod virtual_file {
}
impl IoMode {
pub fn preferred() -> Self {
// The default behavior when running Rust unit tests without any further
// flags is to use the newest behavior if available on the platform (Direct).
// The CI uses the following environment variable to unit tests for all
// different modes.
// NB: the Python regression & perf tests have their own defaults management
// that writes pageserver.toml; they do not use this variable.
if cfg!(test) {
use once_cell::sync::Lazy;
static CACHED: Lazy<IoMode> = Lazy::new(|| {
utils::env::var_serde_json_string(
"NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IO_MODE",
)
.unwrap_or({
#[cfg(target_os = "linux")]
{
IoMode::Direct
}
#[cfg(not(target_os = "linux"))]
{
IoMode::Buffered
}
})
});
*CACHED
} else {
IoMode::Buffered
}
pub const fn preferred() -> Self {
Self::Buffered
}
}

View File

@@ -1,5 +1,5 @@
[package]
name = "endpoint_storage"
name = "object_storage"
version = "0.0.1"
edition.workspace = true
license.workspace = true

View File

@@ -2,7 +2,7 @@ use anyhow::anyhow;
use axum::body::{Body, Bytes};
use axum::response::{IntoResponse, Response};
use axum::{Router, http::StatusCode};
use endpoint_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok};
use object_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok};
use remote_storage::TimeoutOrCancel;
use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, RemotePath};
use std::{sync::Arc, time::SystemTime, time::UNIX_EPOCH};
@@ -46,12 +46,12 @@ async fn metrics() -> Result {
async fn get(S3Path { path }: S3Path, state: State) -> Result {
info!(%path, "downloading");
let download_err = |err| {
if let DownloadError::NotFound = err {
info!(%path, %err, "downloading"); // 404 is not an issue of _this_ service
let download_err = |e| {
if let DownloadError::NotFound = e {
info!(%path, %e, "downloading"); // 404 is not an issue of _this_ service
return not_found(&path);
}
internal_error(err, &path, "downloading")
internal_error(e, &path, "downloading")
};
let cancel = state.cancel.clone();
let opts = &DownloadOpts::default();
@@ -249,7 +249,7 @@ mod tests {
};
let proxy = Storage {
auth: endpoint_storage::JwtAuth::new(TEST_PUB_KEY_ED25519).unwrap(),
auth: object_storage::JwtAuth::new(TEST_PUB_KEY_ED25519).unwrap(),
storage,
cancel: cancel.clone(),
max_upload_file_limit: usize::MAX,
@@ -343,14 +343,14 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
TimelineId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
fn token() -> String {
let claims = endpoint_storage::Claims {
let claims = object_storage::Claims {
tenant_id: TENANT_ID,
timeline_id: TIMELINE_ID,
endpoint_id: ENDPOINT_ID.into(),
exp: u64::MAX,
};
let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
let header = jsonwebtoken::Header::new(endpoint_storage::VALIDATION_ALGO);
let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO);
jsonwebtoken::encode(&header, &claims, &key).unwrap()
}
@@ -364,10 +364,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
vec![TIMELINE_ID.to_string(), TimelineId::generate().to_string()],
vec![ENDPOINT_ID, "ep-ololo"]
)
// first one is fully valid path, second path is valid for GET as
// read paths may have different endpoint if tenant and timeline matches
// (needed for prewarming RO->RW replica)
.skip(2);
.skip(1);
for ((uri, method), (tenant, timeline, endpoint)) in iproduct!(routes(), args) {
info!(%uri, %method, %tenant, %timeline, %endpoint);
@@ -478,16 +475,6 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
requests_chain(chain.into_iter(), |_| token()).await;
}
#[testlog(tokio::test)]
async fn read_other_endpoint_data() {
let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/other_endpoint/key");
let chain = vec![
(uri.clone(), "GET", "", StatusCode::NOT_FOUND, false),
(uri.clone(), "PUT", "", StatusCode::UNAUTHORIZED, false),
];
requests_chain(chain.into_iter(), |_| token()).await;
}
fn delete_prefix_token(uri: &str) -> String {
use serde::Serialize;
let parts = uri.split("/").collect::<Vec<&str>>();
@@ -495,7 +482,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
struct PrefixClaims {
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
endpoint_id: Option<endpoint_storage::EndpointId>,
endpoint_id: Option<object_storage::EndpointId>,
exp: u64,
}
let claims = PrefixClaims {
@@ -505,7 +492,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
exp: u64::MAX,
};
let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
let header = jsonwebtoken::Header::new(endpoint_storage::VALIDATION_ALGO);
let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO);
jsonwebtoken::encode(&header, &claims, &key).unwrap()
}

View File

@@ -169,19 +169,10 @@ impl FromRequestParts<Arc<Storage>> for S3Path {
.auth
.decode(bearer.token())
.map_err(|e| bad_request(e, "decoding token"))?;
// Read paths may have different endpoint ids. For readonly -> readwrite replica
// prewarming, endpoint must read other endpoint's data.
let endpoint_id = if parts.method == axum::http::Method::GET {
claims.endpoint_id.clone()
} else {
path.endpoint_id.clone()
};
let route = Claims {
tenant_id: path.tenant_id,
timeline_id: path.timeline_id,
endpoint_id,
endpoint_id: path.endpoint_id.clone(),
exp: claims.exp,
};
if route != claims {

View File

@@ -1,4 +1,4 @@
//! `endpoint_storage` is a service which provides API for uploading and downloading
//! `object_storage` is a service which provides API for uploading and downloading
//! files. It is used by compute and control plane for accessing LFC prewarm data.
//! This service is deployed either as a separate component or as part of compute image
//! for large computes.
@@ -33,7 +33,7 @@ async fn main() -> anyhow::Result<()> {
let config: String = std::env::args().skip(1).take(1).collect();
if config.is_empty() {
anyhow::bail!("Usage: endpoint_storage config.json")
anyhow::bail!("Usage: object_storage config.json")
}
info!("Reading config from {config}");
let config = std::fs::read_to_string(config.clone())?;
@@ -41,7 +41,7 @@ async fn main() -> anyhow::Result<()> {
info!("Reading pemfile from {}", config.pemfile.clone());
let pemfile = std::fs::read(config.pemfile.clone())?;
info!("Loading public key from {}", config.pemfile.clone());
let auth = endpoint_storage::JwtAuth::new(&pemfile)?;
let auth = object_storage::JwtAuth::new(&pemfile)?;
let listener = tokio::net::TcpListener::bind(config.listen).await.unwrap();
info!("listening on {}", listener.local_addr().unwrap());
@@ -50,7 +50,7 @@ async fn main() -> anyhow::Result<()> {
let cancel = tokio_util::sync::CancellationToken::new();
app::check_storage_permissions(&storage, cancel.clone()).await?;
let proxy = std::sync::Arc::new(endpoint_storage::Storage {
let proxy = std::sync::Arc::new(object_storage::Storage {
auth,
storage,
cancel: cancel.clone(),

View File

@@ -78,7 +78,6 @@ metrics.workspace = true
pageserver_api.workspace = true
pageserver_client.workspace = true # for ResponseErrorMessageExt TOOD refactor that
pageserver_compaction.workspace = true
pem.workspace = true
postgres_connection.workspace = true
postgres_ffi.workspace = true
pq_proto.workspace = true

View File

@@ -68,13 +68,6 @@ pub(crate) struct Args {
targets: Option<Vec<TenantTimelineId>>,
}
/// State shared by all clients
#[derive(Debug)]
struct SharedState {
start_work_barrier: tokio::sync::Barrier,
live_stats: LiveStats,
}
#[derive(Debug, Default)]
struct LiveStats {
completed_requests: AtomicU64,
@@ -247,26 +240,24 @@ async fn main_impl(
all_ranges
};
let live_stats = Arc::new(LiveStats::default());
let num_live_stats_dump = 1;
let num_work_sender_tasks = args.num_clients.get() * timelines.len();
let num_main_impl = 1;
let shared_state = Arc::new(SharedState {
start_work_barrier: tokio::sync::Barrier::new(
num_live_stats_dump + num_work_sender_tasks + num_main_impl,
),
live_stats: LiveStats::default(),
});
let cancel = CancellationToken::new();
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
num_live_stats_dump + num_work_sender_tasks + num_main_impl,
));
let ss = shared_state.clone();
tokio::spawn({
let stats = Arc::clone(&live_stats);
let start_work_barrier = Arc::clone(&start_work_barrier);
async move {
ss.start_work_barrier.wait().await;
start_work_barrier.wait().await;
loop {
let start = std::time::Instant::now();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let stats = &ss.live_stats;
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
let missed = stats.missed.swap(0, Ordering::Relaxed);
let elapsed = start.elapsed();
@@ -279,12 +270,14 @@ async fn main_impl(
}
});
let cancel = CancellationToken::new();
let rps_period = args
.per_client_rate
.map(|rps_limit| Duration::from_secs_f64(1.0 / (rps_limit as f64)));
let make_worker: &dyn Fn(WorkerId) -> Pin<Box<dyn Send + Future<Output = ()>>> = &|worker_id| {
let ss = shared_state.clone();
let cancel = cancel.clone();
let live_stats = live_stats.clone();
let start_work_barrier = start_work_barrier.clone();
let ranges: Vec<KeyRange> = all_ranges
.iter()
.filter(|r| r.timeline == worker_id.timeline)
@@ -294,8 +287,85 @@ async fn main_impl(
rand::distributions::weighted::WeightedIndex::new(ranges.iter().map(|v| v.len()))
.unwrap();
let cancel = cancel.clone();
Box::pin(async move {
client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await
let client =
pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
.await
.unwrap();
let mut client = client
.pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id)
.await
.unwrap();
start_work_barrier.wait().await;
let client_start = Instant::now();
let mut ticks_processed = 0;
let mut inflight = VecDeque::new();
while !cancel.is_cancelled() {
// Detect if a request took longer than the RPS rate
if let Some(period) = &rps_period {
let periods_passed_until_now =
usize::try_from(client_start.elapsed().as_micros() / period.as_micros())
.unwrap();
if periods_passed_until_now > ticks_processed {
live_stats.missed((periods_passed_until_now - ticks_processed) as u64);
}
ticks_processed = periods_passed_until_now;
}
while inflight.len() < args.queue_depth.get() {
let start = Instant::now();
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(key.is_rel_block_key());
let (rel_tag, block_no) = key
.to_rel_block()
.expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
not_modified_since: r.timeline_lsn,
},
rel: rel_tag,
blkno: block_no,
}
};
client.getpage_send(req).await.unwrap();
inflight.push_back(start);
}
let start = inflight.pop_front().unwrap();
client.getpage_recv().await.unwrap();
let end = Instant::now();
live_stats.request_done();
ticks_processed += 1;
STATS.with(|stats| {
stats
.borrow()
.lock()
.unwrap()
.observe(end.duration_since(start))
.unwrap();
});
if let Some(period) = &rps_period {
let next_at = client_start
+ Duration::from_micros(
(ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
);
tokio::time::sleep_until(next_at.into()).await;
}
}
})
};
@@ -317,7 +387,7 @@ async fn main_impl(
};
info!("waiting for everything to become ready");
shared_state.start_work_barrier.wait().await;
start_work_barrier.wait().await;
info!("work started");
if let Some(runtime) = args.runtime {
tokio::time::sleep(runtime.into()).await;
@@ -346,91 +416,3 @@ async fn main_impl(
anyhow::Ok(())
}
async fn client_libpq(
args: &Args,
worker_id: WorkerId,
shared_state: Arc<SharedState>,
cancel: CancellationToken,
rps_period: Option<Duration>,
ranges: Vec<KeyRange>,
weights: rand::distributions::weighted::WeightedIndex<i128>,
) {
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
.await
.unwrap();
let mut client = client
.pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id)
.await
.unwrap();
shared_state.start_work_barrier.wait().await;
let client_start = Instant::now();
let mut ticks_processed = 0;
let mut inflight = VecDeque::new();
while !cancel.is_cancelled() {
// Detect if a request took longer than the RPS rate
if let Some(period) = &rps_period {
let periods_passed_until_now =
usize::try_from(client_start.elapsed().as_micros() / period.as_micros()).unwrap();
if periods_passed_until_now > ticks_processed {
shared_state
.live_stats
.missed((periods_passed_until_now - ticks_processed) as u64);
}
ticks_processed = periods_passed_until_now;
}
while inflight.len() < args.queue_depth.get() {
let start = Instant::now();
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(key.is_rel_block_key());
let (rel_tag, block_no) = key
.to_rel_block()
.expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
not_modified_since: r.timeline_lsn,
},
rel: rel_tag,
blkno: block_no,
}
};
client.getpage_send(req).await.unwrap();
inflight.push_back(start);
}
let start = inflight.pop_front().unwrap();
client.getpage_recv().await.unwrap();
let end = Instant::now();
shared_state.live_stats.request_done();
ticks_processed += 1;
STATS.with(|stats| {
stats
.borrow()
.lock()
.unwrap()
.observe(end.duration_since(start))
.unwrap();
});
if let Some(period) = &rps_period {
let next_at = client_start
+ Duration::from_micros(
(ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
);
tokio::time::sleep_until(next_at.into()).await;
}
}
}

View File

@@ -73,6 +73,7 @@ impl From<GetVectoredError> for BasebackupError {
/// * When working without safekeepers. In this situation it is important to match the lsn
/// we are taking basebackup on with the lsn that is used in pageserver's walreceiver
/// to start the replication.
#[allow(clippy::too_many_arguments)]
pub async fn send_basebackup_tarball<'a, W>(
write: &'a mut W,
timeline: &'a Timeline,
@@ -80,6 +81,7 @@ pub async fn send_basebackup_tarball<'a, W>(
prev_lsn: Option<Lsn>,
full_backup: bool,
replica: bool,
lazy_slru_download_enabled: bool,
ctx: &'a RequestContext,
) -> Result<(), BasebackupError>
where
@@ -131,8 +133,8 @@ where
};
info!(
"taking basebackup lsn={}, prev_lsn={} (full_backup={}, replica={})",
backup_lsn, prev_lsn, full_backup, replica
"taking basebackup lsn={}, prev_lsn={} (full_backup={}, replica={}, lazy_slru_download_enabled={})",
backup_lsn, prev_lsn, full_backup, replica, lazy_slru_download_enabled
);
let basebackup = Basebackup {
@@ -142,6 +144,7 @@ where
prev_record_lsn: prev_lsn,
full_backup,
replica,
lazy_slru_download_enabled,
ctx,
io_concurrency: IoConcurrency::spawn_from_conf(
timeline.conf,
@@ -170,6 +173,7 @@ where
prev_record_lsn: Lsn,
full_backup: bool,
replica: bool,
lazy_slru_download_enabled: bool,
ctx: &'a RequestContext,
io_concurrency: IoConcurrency,
}
@@ -308,7 +312,10 @@ where
self.timeline.pg_version,
)?;
let lazy_slru_download = self.timeline.get_lazy_slru_download() && !self.full_backup;
let lazy_slru_download = self
.timeline
.get_lazy_slru_download(self.lazy_slru_download_enabled)
&& !self.full_backup;
let pgversion = self.timeline.pg_version;
let subdirs = dispatch_pgversion!(pgversion, &pgv::bindings::PGDATA_SUBDIRS[..]);

View File

@@ -416,18 +416,8 @@ fn start_pageserver(
// The storage_broker::connect call needs to happen inside a tokio runtime thread.
let broker_client = WALRECEIVER_RUNTIME
.block_on(async {
let tls_config = storage_broker::ClientTlsConfig::new().ca_certificates(
conf.ssl_ca_certs
.iter()
.map(pem::encode)
.map(storage_broker::Certificate::from_pem),
);
// Note: we do not attempt connecting here (but validate endpoints sanity).
storage_broker::connect(
conf.broker_endpoint.clone(),
conf.broker_keepalive_interval,
tls_config,
)
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)
})
.with_context(|| {
format!(

View File

@@ -17,10 +17,9 @@ use once_cell::sync::OnceCell;
use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes};
use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
use pem::Pem;
use postgres_backend::AuthType;
use remote_storage::{RemotePath, RemoteStorageConfig};
use reqwest::Url;
use reqwest::{Certificate, Url};
use storage_broker::Uri;
use utils::id::{NodeId, TimelineId};
use utils::logging::{LogFormat, SecretString};
@@ -68,8 +67,8 @@ pub struct PageServerConf {
/// Period to reload certificate and private key from files.
/// Default: 60s.
pub ssl_cert_reload_period: Duration,
/// Trusted root CA certificates to use in https APIs in PEM format.
pub ssl_ca_certs: Vec<Pem>,
/// Trusted root CA certificates to use in https APIs.
pub ssl_ca_certs: Vec<Certificate>,
/// Current availability zone. Used for traffic metrics.
pub availability_zone: Option<String>,
@@ -119,13 +118,13 @@ pub struct PageServerConf {
/// A lower value implicitly deprioritizes loading such tenants, vs. other work in the system.
pub concurrent_tenant_warmup: ConfigurableSemaphore,
/// Number of concurrent [`TenantShard::gather_size_inputs`](crate::tenant::TenantShard::gather_size_inputs) allowed.
/// Number of concurrent [`Tenant::gather_size_inputs`](crate::tenant::Tenant::gather_size_inputs) allowed.
pub concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore,
/// Limit of concurrent [`TenantShard::gather_size_inputs`] issued by module `eviction_task`.
/// Limit of concurrent [`Tenant::gather_size_inputs`] issued by module `eviction_task`.
/// The number of permits is the same as `concurrent_tenant_size_logical_size_queries`.
/// See the comment in `eviction_task` for details.
///
/// [`TenantShard::gather_size_inputs`]: crate::tenant::TenantShard::gather_size_inputs
/// [`Tenant::gather_size_inputs`]: crate::tenant::Tenant::gather_size_inputs
pub eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore,
// How often to collect metrics and send them to the metrics endpoint.
@@ -498,10 +497,7 @@ impl PageServerConf {
ssl_ca_certs: match ssl_ca_file {
Some(ssl_ca_file) => {
let buf = std::fs::read(ssl_ca_file)?;
pem::parse_many(&buf)?
.into_iter()
.filter(|pem| pem.tag() == "CERTIFICATE")
.collect()
Certificate::from_pem_bundle(&buf)?
}
None => Vec::new(),
},
@@ -592,10 +588,10 @@ impl ConfigurableSemaphore {
/// Initializse using a non-zero amount of permits.
///
/// Require a non-zero initial permits, because using permits == 0 is a crude way to disable a
/// feature such as [`TenantShard::gather_size_inputs`]. Otherwise any semaphore using future will
/// feature such as [`Tenant::gather_size_inputs`]. Otherwise any semaphore using future will
/// behave like [`futures::future::pending`], just waiting until new permits are added.
///
/// [`TenantShard::gather_size_inputs`]: crate::tenant::TenantShard::gather_size_inputs
/// [`Tenant::gather_size_inputs`]: crate::tenant::Tenant::gather_size_inputs
pub fn new(initial_permits: NonZeroUsize) -> Self {
ConfigurableSemaphore {
initial_permits,

View File

@@ -24,7 +24,7 @@ use crate::task_mgr::{self, BACKGROUND_RUNTIME, TaskKind};
use crate::tenant::mgr::TenantManager;
use crate::tenant::size::CalculateSyntheticSizeError;
use crate::tenant::tasks::BackgroundLoopKind;
use crate::tenant::{LogicalSizeCalculationCause, TenantShard};
use crate::tenant::{LogicalSizeCalculationCause, Tenant};
mod disk_cache;
mod metrics;
@@ -428,7 +428,7 @@ async fn calculate_synthetic_size_worker(
}
}
async fn calculate_and_log(tenant: &TenantShard, cancel: &CancellationToken, ctx: &RequestContext) {
async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &RequestContext) {
const CAUSE: LogicalSizeCalculationCause =
LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize;

View File

@@ -175,9 +175,9 @@ impl MetricsKey {
.absolute_values()
}
/// [`TenantShard::remote_size`]
/// [`Tenant::remote_size`]
///
/// [`TenantShard::remote_size`]: crate::tenant::TenantShard::remote_size
/// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
@@ -199,9 +199,9 @@ impl MetricsKey {
.absolute_values()
}
/// [`TenantShard::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
/// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
///
/// [`TenantShard::cached_synthetic_size`]: crate::tenant::TenantShard::cached_synthetic_size
/// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
/// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
MetricsKey {
@@ -254,7 +254,7 @@ pub(super) async fn collect_all_metrics(
async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<NewRawMetric>
where
S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::TenantShard>)>,
S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
{
let mut current_metrics: Vec<NewRawMetric> = Vec::new();
@@ -263,9 +263,7 @@ where
while let Some((tenant_id, tenant)) = tenants.next().await {
let mut tenant_resident_size = 0;
let timelines = tenant.list_timelines();
let timelines_len = timelines.len();
for timeline in timelines {
for timeline in tenant.list_timelines() {
let timeline_id = timeline.timeline_id;
match TimelineSnapshot::collect(&timeline, ctx) {
@@ -291,11 +289,6 @@ where
tenant_resident_size += timeline.resident_physical_size();
}
if timelines_len == 0 {
// Force set it to 1 byte to avoid not being reported -- all timelines are offloaded.
tenant_resident_size = 1;
}
let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
}
@@ -315,7 +308,7 @@ impl TenantSnapshot {
///
/// `resident_size` is calculated of the timelines we had access to for other metrics, so we
/// cannot just list timelines here.
fn collect(t: &Arc<crate::tenant::TenantShard>, resident_size: u64) -> Self {
fn collect(t: &Arc<crate::tenant::Tenant>, resident_size: u64) -> Self {
TenantSnapshot {
resident_size,
remote_size: t.remote_size(),

View File

@@ -8,7 +8,6 @@ use pageserver_api::upcall_api::{
ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
ValidateRequestTenant, ValidateResponse,
};
use reqwest::Certificate;
use serde::Serialize;
use serde::de::DeserializeOwned;
use tokio_util::sync::CancellationToken;
@@ -77,8 +76,8 @@ impl StorageControllerUpcallClient {
client = client.default_headers(headers);
}
for cert in &conf.ssl_ca_certs {
client = client.add_root_certificate(Certificate::from_der(cert.contents())?);
for ssl_ca_cert in &conf.ssl_ca_certs {
client = client.add_root_certificate(ssl_ca_cert.clone());
}
Ok(Some(Self {

View File

@@ -1873,7 +1873,7 @@ async fn update_tenant_config_handler(
&ShardParameters::default(),
);
crate::tenant::TenantShard::persist_tenant_config(state.conf, &tenant_shard_id, &location_conf)
crate::tenant::Tenant::persist_tenant_config(state.conf, &tenant_shard_id, &location_conf)
.await
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
@@ -1917,7 +1917,7 @@ async fn patch_tenant_config_handler(
&ShardParameters::default(),
);
crate::tenant::TenantShard::persist_tenant_config(state.conf, &tenant_shard_id, &location_conf)
crate::tenant::Tenant::persist_tenant_config(state.conf, &tenant_shard_id, &location_conf)
.await
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;

View File

@@ -49,7 +49,7 @@ use tracing::{info, info_span};
/// backwards-compatible changes to the metadata format.
pub const STORAGE_FORMAT_VERSION: u16 = 3;
pub const DEFAULT_PG_VERSION: u32 = 17;
pub const DEFAULT_PG_VERSION: u32 = 16;
// Magic constants used to identify different kinds of files
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;

View File

@@ -1086,7 +1086,7 @@ pub(crate) static TIMELINE_EPHEMERAL_BYTES: Lazy<UIntGauge> = Lazy::new(|| {
.expect("Failed to register metric")
});
/// Metrics related to the lifecycle of a [`crate::tenant::TenantShard`] object: things
/// Metrics related to the lifecycle of a [`crate::tenant::Tenant`] object: things
/// like how long it took to load.
///
/// Note that these are process-global metrics, _not_ per-tenant metrics. Per-tenant

View File

@@ -76,7 +76,7 @@ use crate::tenant::timeline::{self, WaitLsnError};
use crate::tenant::{GetTimelineError, PageReconstructError, Timeline};
use crate::{basebackup, timed_after_cancellation};
/// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::TenantShard`] which
/// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::Tenant`] which
/// is not yet in state [`TenantState::Active`].
///
/// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
@@ -2394,6 +2394,7 @@ impl PageServerHandler {
full_backup: bool,
gzip: bool,
replica: bool,
lazy_slru_download: bool,
ctx: &RequestContext,
) -> Result<(), QueryError>
where
@@ -2461,6 +2462,7 @@ impl PageServerHandler {
prev_lsn,
full_backup,
replica,
lazy_slru_download,
&ctx,
)
.await
@@ -2484,6 +2486,7 @@ impl PageServerHandler {
prev_lsn,
full_backup,
replica,
lazy_slru_download,
&ctx,
)
.await
@@ -2501,6 +2504,7 @@ impl PageServerHandler {
prev_lsn,
full_backup,
replica,
lazy_slru_download,
&ctx,
)
.await
@@ -2550,7 +2554,7 @@ impl PageServerHandler {
}
}
/// `basebackup tenant timeline [lsn] [--gzip] [--replica]`
/// `basebackup tenant timeline [lsn] [--gzip] [--replica] [--lazy-slru-download]`
#[derive(Debug, Clone, Eq, PartialEq)]
struct BaseBackupCmd {
tenant_id: TenantId,
@@ -2558,6 +2562,7 @@ struct BaseBackupCmd {
lsn: Option<Lsn>,
gzip: bool,
replica: bool,
lazy_slru_download: bool,
}
/// `fullbackup tenant timeline [lsn] [prev_lsn]`
@@ -2690,6 +2695,7 @@ impl BaseBackupCmd {
let mut gzip = false;
let mut replica = false;
let mut lazy_slru_download = false;
for &param in &parameters[flags_parse_from..] {
match param {
@@ -2705,6 +2711,12 @@ impl BaseBackupCmd {
}
replica = true
}
"--lazy-slru-download" => {
if lazy_slru_download {
bail!("duplicate parameter for basebackup command: {param}")
}
lazy_slru_download = true
}
_ => bail!("invalid parameter for basebackup command: {param}"),
}
}
@@ -2714,6 +2726,7 @@ impl BaseBackupCmd {
lsn,
gzip,
replica,
lazy_slru_download,
})
}
}
@@ -2928,6 +2941,7 @@ where
lsn,
gzip,
replica,
lazy_slru_download,
}) => {
tracing::Span::current()
.record("tenant_id", field::display(tenant_id))
@@ -2949,6 +2963,7 @@ where
false,
gzip,
replica,
lazy_slru_download,
&ctx,
)
.await?;
@@ -2986,6 +3001,7 @@ where
true,
false,
false,
false,
&ctx,
)
.await?;
@@ -3120,7 +3136,8 @@ mod tests {
timeline_id,
lsn: None,
gzip: false,
replica: false
replica: false,
lazy_slru_download: false
})
);
let cmd =
@@ -3132,7 +3149,8 @@ mod tests {
timeline_id,
lsn: None,
gzip: true,
replica: false
replica: false,
lazy_slru_download: false
})
);
let cmd =
@@ -3144,7 +3162,8 @@ mod tests {
timeline_id,
lsn: None,
gzip: false,
replica: false
replica: false,
lazy_slru_download: false
})
);
let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} 0/16ABCDE"))
@@ -3156,7 +3175,8 @@ mod tests {
timeline_id,
lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
gzip: false,
replica: false
replica: false,
lazy_slru_download: false
})
);
let cmd = PageServiceCmd::parse(&format!(
@@ -3170,7 +3190,23 @@ mod tests {
timeline_id,
lsn: None,
gzip: true,
replica: true
replica: true,
lazy_slru_download: false
})
);
let cmd = PageServiceCmd::parse(&format!(
"basebackup {tenant_id} {timeline_id} --replica --gzip --lazy-slru-download"
))
.unwrap();
assert_eq!(
cmd,
PageServiceCmd::BaseBackup(BaseBackupCmd {
tenant_id,
timeline_id,
lsn: None,
gzip: true,
replica: true,
lazy_slru_download: true
})
);
let cmd = PageServiceCmd::parse(&format!(
@@ -3184,7 +3220,8 @@ mod tests {
timeline_id,
lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
gzip: true,
replica: true
replica: true,
lazy_slru_download: false
})
);
let cmd = PageServiceCmd::parse(&format!("fullbackup {tenant_id} {timeline_id}")).unwrap();

View File

@@ -158,7 +158,7 @@ pub struct TenantSharedResources {
pub l0_flush_global_state: L0FlushGlobalState,
}
/// A [`TenantShard`] is really an _attached_ tenant. The configuration
/// A [`Tenant`] is really an _attached_ tenant. The configuration
/// for an attached tenant is a subset of the [`LocationConf`], represented
/// in this struct.
#[derive(Clone)]
@@ -245,7 +245,7 @@ pub(crate) enum SpawnMode {
///
/// Tenant consists of multiple timelines. Keep them in a hash table.
///
pub struct TenantShard {
pub struct Tenant {
// Global pageserver config parameters
pub conf: &'static PageServerConf,
@@ -267,7 +267,7 @@ pub struct TenantShard {
shard_identity: ShardIdentity,
/// The remote storage generation, used to protect S3 objects from split-brain.
/// Does not change over the lifetime of the [`TenantShard`] object.
/// Does not change over the lifetime of the [`Tenant`] object.
///
/// This duplicates the generation stored in LocationConf, but that structure is mutable:
/// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
@@ -309,7 +309,7 @@ pub struct TenantShard {
// Access to global deletion queue for when this tenant wants to schedule a deletion
deletion_queue_client: DeletionQueueClient,
/// Cached logical sizes updated updated on each [`TenantShard::gather_size_inputs`].
/// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
cached_synthetic_tenant_size: Arc<AtomicU64>,
@@ -337,12 +337,12 @@ pub struct TenantShard {
// Timelines' cancellation token.
pub(crate) cancel: CancellationToken,
// Users of the TenantShard such as the page service must take this Gate to avoid
// trying to use a TenantShard which is shutting down.
// Users of the Tenant such as the page service must take this Gate to avoid
// trying to use a Tenant which is shutting down.
pub(crate) gate: Gate,
/// Throttle applied at the top of [`Timeline::get`].
/// All [`TenantShard::timelines`] of a given [`TenantShard`] instance share the same [`throttle::Throttle`] instance.
/// All [`Tenant::timelines`] of a given [`Tenant`] instance share the same [`throttle::Throttle`] instance.
pub(crate) pagestream_throttle: Arc<throttle::Throttle>,
pub(crate) pagestream_throttle_metrics: Arc<crate::metrics::tenant_throttling::Pagestream>,
@@ -362,7 +362,7 @@ pub struct TenantShard {
l0_flush_global_state: L0FlushGlobalState,
}
impl std::fmt::Debug for TenantShard {
impl std::fmt::Debug for Tenant {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} ({})", self.tenant_shard_id, self.current_state())
}
@@ -841,7 +841,7 @@ impl Debug for SetStoppingError {
}
}
/// Arguments to [`TenantShard::create_timeline`].
/// Arguments to [`Tenant::create_timeline`].
///
/// Not usable as an idempotency key for timeline creation because if [`CreateTimelineParamsBranch::ancestor_start_lsn`]
/// is `None`, the result of the timeline create call is not deterministic.
@@ -876,7 +876,7 @@ pub(crate) struct CreateTimelineParamsImportPgdata {
pub(crate) idempotency_key: import_pgdata::index_part_format::IdempotencyKey,
}
/// What is used to determine idempotency of a [`TenantShard::create_timeline`] call in [`TenantShard::start_creating_timeline`] in [`TenantShard::start_creating_timeline`].
/// What is used to determine idempotency of a [`Tenant::create_timeline`] call in [`Tenant::start_creating_timeline`] in [`Tenant::start_creating_timeline`].
///
/// Each [`Timeline`] object holds [`Self`] as an immutable property in [`Timeline::create_idempotency`].
///
@@ -914,7 +914,7 @@ pub(crate) struct CreatingTimelineIdempotencyImportPgdata {
idempotency_key: import_pgdata::index_part_format::IdempotencyKey,
}
/// What is returned by [`TenantShard::start_creating_timeline`].
/// What is returned by [`Tenant::start_creating_timeline`].
#[must_use]
enum StartCreatingTimelineResult {
CreateGuard(TimelineCreateGuard),
@@ -943,13 +943,13 @@ struct TimelineInitAndSyncNeedsSpawnImportPgdata {
guard: TimelineCreateGuard,
}
/// What is returned by [`TenantShard::create_timeline`].
/// What is returned by [`Tenant::create_timeline`].
enum CreateTimelineResult {
Created(Arc<Timeline>),
Idempotent(Arc<Timeline>),
/// IMPORTANT: This [`Arc<Timeline>`] object is not in [`TenantShard::timelines`] when
/// IMPORTANT: This [`Arc<Timeline>`] object is not in [`Tenant::timelines`] when
/// we return this result, nor will this concrete object ever be added there.
/// Cf method comment on [`TenantShard::create_timeline_import_pgdata`].
/// Cf method comment on [`Tenant::create_timeline_import_pgdata`].
ImportSpawned(Arc<Timeline>),
}
@@ -1082,7 +1082,7 @@ pub(crate) enum LoadConfigError {
NotFound(Utf8PathBuf),
}
impl TenantShard {
impl Tenant {
/// Yet another helper for timeline initialization.
///
/// - Initializes the Timeline struct and inserts it into the tenant's hash map
@@ -1303,7 +1303,7 @@ impl TenantShard {
init_order: Option<InitializationOrder>,
mode: SpawnMode,
ctx: &RequestContext,
) -> Result<Arc<TenantShard>, GlobalShutDown> {
) -> Result<Arc<Tenant>, GlobalShutDown> {
let wal_redo_manager =
WalRedoManager::new(PostgresRedoManager::new(conf, tenant_shard_id))?;
@@ -1317,7 +1317,7 @@ impl TenantShard {
let attach_mode = attached_conf.location.attach_mode;
let generation = attached_conf.location.generation;
let tenant = Arc::new(TenantShard::new(
let tenant = Arc::new(Tenant::new(
TenantState::Attaching,
conf,
attached_conf,
@@ -1334,7 +1334,7 @@ impl TenantShard {
let attach_gate_guard = tenant
.gate
.enter()
.expect("We just created the TenantShard: nothing else can have shut it down yet");
.expect("We just created the Tenant: nothing else can have shut it down yet");
// Do all the hard work in the background
let tenant_clone = Arc::clone(&tenant);
@@ -1362,7 +1362,7 @@ impl TenantShard {
}
}
fn make_broken_or_stopping(t: &TenantShard, err: anyhow::Error) {
fn make_broken_or_stopping(t: &Tenant, err: anyhow::Error) {
t.state.send_modify(|state| match state {
// TODO: the old code alluded to DeleteTenantFlow sometimes setting
// TenantState::Stopping before we get here, but this may be outdated.
@@ -1627,7 +1627,7 @@ impl TenantShard {
/// No background tasks are started as part of this routine.
///
async fn attach(
self: &Arc<TenantShard>,
self: &Arc<Tenant>,
preload: Option<TenantPreload>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -1957,7 +1957,7 @@ impl TenantShard {
}
async fn load_timelines_metadata(
self: &Arc<TenantShard>,
self: &Arc<Tenant>,
timeline_ids: HashSet<TimelineId>,
remote_storage: &GenericRemoteStorage,
heatmap: Option<(HeatMapTenant, std::time::Instant)>,
@@ -2028,7 +2028,7 @@ impl TenantShard {
}
fn load_timeline_metadata(
self: &Arc<TenantShard>,
self: &Arc<Tenant>,
timeline_id: TimelineId,
remote_storage: GenericRemoteStorage,
previous_heatmap: Option<PreviousHeatmap>,
@@ -2429,14 +2429,14 @@ impl TenantShard {
/// This is used by tests & import-from-basebackup.
///
/// The returned [`UninitializedTimeline`] contains no data nor metadata and it is in
/// a state that will fail [`TenantShard::load_remote_timeline`] because `disk_consistent_lsn=Lsn(0)`.
/// a state that will fail [`Tenant::load_remote_timeline`] because `disk_consistent_lsn=Lsn(0)`.
///
/// The caller is responsible for getting the timeline into a state that will be accepted
/// by [`TenantShard::load_remote_timeline`] / [`TenantShard::attach`].
/// by [`Tenant::load_remote_timeline`] / [`Tenant::attach`].
/// Then they may call [`UninitializedTimeline::finish_creation`] to add the timeline
/// to the [`TenantShard::timelines`].
/// to the [`Tenant::timelines`].
///
/// Tests should use `TenantShard::create_test_timeline` to set up the minimum required metadata keys.
/// Tests should use `Tenant::create_test_timeline` to set up the minimum required metadata keys.
pub(crate) async fn create_empty_timeline(
self: &Arc<Self>,
new_timeline_id: TimelineId,
@@ -2584,7 +2584,7 @@ impl TenantShard {
/// the same timeline ID already exists, returns CreateTimelineError::AlreadyExists.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn create_timeline(
self: &Arc<TenantShard>,
self: &Arc<Tenant>,
params: CreateTimelineParams,
broker_client: storage_broker::BrokerClientChannel,
ctx: &RequestContext,
@@ -2751,13 +2751,13 @@ impl TenantShard {
Ok(activated_timeline)
}
/// The returned [`Arc<Timeline>`] is NOT in the [`TenantShard::timelines`] map until the import
/// The returned [`Arc<Timeline>`] is NOT in the [`Tenant::timelines`] map until the import
/// completes in the background. A DIFFERENT [`Arc<Timeline>`] will be inserted into the
/// [`TenantShard::timelines`] map when the import completes.
/// [`Tenant::timelines`] map when the import completes.
/// We only return an [`Arc<Timeline>`] here so the API handler can create a [`pageserver_api::models::TimelineInfo`]
/// for the response.
async fn create_timeline_import_pgdata(
self: &Arc<Self>,
self: &Arc<Tenant>,
params: CreateTimelineParamsImportPgdata,
activate: ActivateTimelineArgs,
ctx: &RequestContext,
@@ -2854,7 +2854,7 @@ impl TenantShard {
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))]
async fn create_timeline_import_pgdata_task(
self: Arc<TenantShard>,
self: Arc<Tenant>,
timeline: Arc<Timeline>,
index_part: import_pgdata::index_part_format::Root,
activate: ActivateTimelineArgs,
@@ -2882,7 +2882,7 @@ impl TenantShard {
}
async fn create_timeline_import_pgdata_task_impl(
self: Arc<TenantShard>,
self: Arc<Tenant>,
timeline: Arc<Timeline>,
index_part: import_pgdata::index_part_format::Root,
activate: ActivateTimelineArgs,
@@ -2899,10 +2899,10 @@ impl TenantShard {
// Reload timeline from remote.
// This proves that the remote state is attachable, and it reuses the code.
//
// TODO: think about whether this is safe to do with concurrent TenantShard::shutdown.
// TODO: think about whether this is safe to do with concurrent Tenant::shutdown.
// timeline_create_guard hols the tenant gate open, so, shutdown cannot _complete_ until we exit.
// But our activate() call might launch new background tasks after TenantShard::shutdown
// already went past shutting down the TenantShard::timelines, which this timeline here is no part of.
// But our activate() call might launch new background tasks after Tenant::shutdown
// already went past shutting down the Tenant::timelines, which this timeline here is no part of.
// I think the same problem exists with the bootstrap & branch mgmt API tasks (tenant shutting
// down while bootstrapping/branching + activating), but, the race condition is much more likely
// to manifest because of the long runtime of this import task.
@@ -2917,7 +2917,7 @@ impl TenantShard {
// };
let timeline_id = timeline.timeline_id;
// load from object storage like TenantShard::attach does
// load from object storage like Tenant::attach does
let resources = self.build_timeline_resources(timeline_id);
let index_part = resources
.remote_client
@@ -3938,7 +3938,7 @@ enum ActivateTimelineArgs {
No,
}
impl TenantShard {
impl Tenant {
pub fn tenant_specific_overrides(&self) -> pageserver_api::models::TenantConfig {
self.tenant_conf.load().tenant_conf.clone()
}
@@ -4096,7 +4096,7 @@ impl TenantShard {
update: F,
) -> anyhow::Result<pageserver_api::models::TenantConfig> {
// Use read-copy-update in order to avoid overwriting the location config
// state if this races with [`TenantShard::set_new_location_config`]. Note that
// state if this races with [`Tenant::set_new_location_config`]. Note that
// this race is not possible if both request types come from the storage
// controller (as they should!) because an exclusive op lock is required
// on the storage controller side.
@@ -4219,7 +4219,7 @@ impl TenantShard {
Ok((timeline, timeline_ctx))
}
/// [`TenantShard::shutdown`] must be called before dropping the returned [`TenantShard`] object
/// [`Tenant::shutdown`] must be called before dropping the returned [`Tenant`] object
/// to ensure proper cleanup of background tasks and metrics.
//
// Allow too_many_arguments because a constructor's argument list naturally grows with the
@@ -4235,7 +4235,7 @@ impl TenantShard {
remote_storage: GenericRemoteStorage,
deletion_queue_client: DeletionQueueClient,
l0_flush_global_state: L0FlushGlobalState,
) -> TenantShard {
) -> Tenant {
debug_assert!(
!attached_conf.location.generation.is_none() || conf.control_plane_api.is_none()
);
@@ -4295,7 +4295,7 @@ impl TenantShard {
}
});
TenantShard {
Tenant {
tenant_shard_id,
shard_identity,
generation: attached_conf.location.generation,
@@ -4330,7 +4330,7 @@ impl TenantShard {
cancel: CancellationToken::default(),
gate: Gate::default(),
pagestream_throttle: Arc::new(throttle::Throttle::new(
TenantShard::get_pagestream_throttle_config(conf, &attached_conf.tenant_conf),
Tenant::get_pagestream_throttle_config(conf, &attached_conf.tenant_conf),
)),
pagestream_throttle_metrics: Arc::new(
crate::metrics::tenant_throttling::Pagestream::new(&tenant_shard_id),
@@ -4466,11 +4466,11 @@ impl TenantShard {
// Perform GC for each timeline.
//
// Note that we don't hold the `TenantShard::gc_cs` lock here because we don't want to delay the
// Note that we don't hold the `Tenant::gc_cs` lock here because we don't want to delay the
// branch creation task, which requires the GC lock. A GC iteration can run concurrently
// with branch creation.
//
// See comments in [`TenantShard::branch_timeline`] for more information about why branch
// See comments in [`Tenant::branch_timeline`] for more information about why branch
// creation task can run concurrently with timeline's GC iteration.
for timeline in gc_timelines {
if cancel.is_cancelled() {
@@ -4500,7 +4500,7 @@ impl TenantShard {
/// Refreshes the Timeline::gc_info for all timelines, returning the
/// vector of timelines which have [`Timeline::get_last_record_lsn`] past
/// [`TenantShard::get_gc_horizon`].
/// [`Tenant::get_gc_horizon`].
///
/// This is usually executed as part of periodic gc, but can now be triggered more often.
pub(crate) async fn refresh_gc_info(
@@ -5499,7 +5499,7 @@ impl TenantShard {
}
}
// The flushes we did above were just writes, but the TenantShard might have had
// The flushes we did above were just writes, but the Tenant might have had
// pending deletions as well from recent compaction/gc: we want to flush those
// as well. This requires flushing the global delete queue. This is cheap
// because it's typically a no-op.
@@ -5517,7 +5517,7 @@ impl TenantShard {
/// How much local storage would this tenant like to have? It can cope with
/// less than this (via eviction and on-demand downloads), but this function enables
/// the TenantShard to advertise how much storage it would prefer to have to provide fast I/O
/// the Tenant to advertise how much storage it would prefer to have to provide fast I/O
/// by keeping important things on local disk.
///
/// This is a heuristic, not a guarantee: tenants that are long-idle will actually use less
@@ -5540,11 +5540,11 @@ impl TenantShard {
/// manifest in `Self::remote_tenant_manifest`.
///
/// TODO: instead of requiring callers to remember to call `maybe_upload_tenant_manifest` after
/// changing any `TenantShard` state that's included in the manifest, consider making the manifest
/// changing any `Tenant` state that's included in the manifest, consider making the manifest
/// the authoritative source of data with an API that automatically uploads on changes. Revisit
/// this when the manifest is more widely used and we have a better idea of the data model.
pub(crate) async fn maybe_upload_tenant_manifest(&self) -> Result<(), TenantManifestError> {
// Multiple tasks may call this function concurrently after mutating the TenantShard runtime
// Multiple tasks may call this function concurrently after mutating the Tenant runtime
// state, affecting the manifest generated by `build_tenant_manifest`. We use an async mutex
// to serialize these callers. `eq_ignoring_version` acts as a slightly inefficient but
// simple coalescing mechanism.
@@ -5812,7 +5812,7 @@ pub(crate) mod harness {
info_span!("TenantHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
}
pub(crate) async fn load(&self) -> (Arc<TenantShard>, RequestContext) {
pub(crate) async fn load(&self) -> (Arc<Tenant>, RequestContext) {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
.with_scope_unit_test();
(
@@ -5827,10 +5827,10 @@ pub(crate) mod harness {
pub(crate) async fn do_try_load(
&self,
ctx: &RequestContext,
) -> anyhow::Result<Arc<TenantShard>> {
) -> anyhow::Result<Arc<Tenant>> {
let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
let tenant = Arc::new(TenantShard::new(
let tenant = Arc::new(Tenant::new(
TenantState::Attaching,
self.conf,
AttachedTenantConf::try_from(LocationConf::attached_single(
@@ -6046,7 +6046,7 @@ mod tests {
#[cfg(feature = "testing")]
#[allow(clippy::too_many_arguments)]
async fn randomize_timeline(
tenant: &Arc<TenantShard>,
tenant: &Arc<Tenant>,
new_timeline_id: TimelineId,
pg_version: u32,
spec: TestTimelineSpecification,
@@ -6936,7 +6936,7 @@ mod tests {
}
async fn bulk_insert_compact_gc(
tenant: &TenantShard,
tenant: &Tenant,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
lsn: Lsn,
@@ -6948,7 +6948,7 @@ mod tests {
}
async fn bulk_insert_maybe_compact_gc(
tenant: &TenantShard,
tenant: &Tenant,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
mut lsn: Lsn,
@@ -7858,7 +7858,7 @@ mod tests {
let (tline, _ctx) = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
.await?;
// Leave the timeline ID in [`TenantShard::timelines_creating`] to exclude attempting to create it again
// Leave the timeline ID in [`Tenant::timelines_creating`] to exclude attempting to create it again
let raw_tline = tline.raw_timeline().unwrap();
raw_tline
.shutdown(super::timeline::ShutdownMode::Hard)

View File

@@ -37,63 +37,6 @@ pub struct CompressionInfo {
pub compressed_size: Option<usize>,
}
/// A blob header, with header+data length and compression info.
///
/// TODO: use this more widely, and add an encode() method too.
/// TODO: document the header format.
#[derive(Clone, Copy, Default)]
pub struct Header {
pub header_len: usize,
pub data_len: usize,
pub compression_bits: u8,
}
impl Header {
/// Decodes a header from a byte slice.
pub fn decode(bytes: &[u8]) -> Result<Self, std::io::Error> {
let Some(&first_header_byte) = bytes.first() else {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"zero-length blob header",
));
};
// If the first bit is 0, this is just a 1-byte length prefix up to 128 bytes.
if first_header_byte < 0x80 {
return Ok(Self {
header_len: 1, // by definition
data_len: first_header_byte as usize,
compression_bits: BYTE_UNCOMPRESSED,
});
}
// Otherwise, this is a 4-byte header containing compression information and length.
const HEADER_LEN: usize = 4;
let mut header_buf: [u8; HEADER_LEN] = bytes[0..HEADER_LEN].try_into().map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("blob header too short: {bytes:?}"),
)
})?;
// TODO: verify the compression bits and convert to an enum.
let compression_bits = header_buf[0] & LEN_COMPRESSION_BIT_MASK;
header_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
let data_len = u32::from_be_bytes(header_buf) as usize;
Ok(Self {
header_len: HEADER_LEN,
data_len,
compression_bits,
})
}
/// Returns the total header+data length.
pub fn total_len(&self) -> usize {
self.header_len + self.data_len
}
}
impl BlockCursor<'_> {
/// Read a blob into a new buffer.
pub async fn read_blob(
@@ -446,34 +389,6 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
};
(srcbuf, res.map(|_| (offset, compression_info)))
}
/// Writes a raw blob containing both header and data, returning its offset.
pub(crate) async fn write_blob_raw<Buf: IoBuf + Send>(
&mut self,
raw_with_header: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<u64, Error>) {
// Verify the header, to ensure we don't write invalid/corrupt data.
let header = match Header::decode(&raw_with_header) {
Ok(header) => header,
Err(err) => return (raw_with_header, Err(err)),
};
if raw_with_header.len() != header.total_len() {
let header_total_len = header.total_len();
let raw_len = raw_with_header.len();
return (
raw_with_header,
Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("header length mismatch: {header_total_len} != {raw_len}"),
)),
);
}
let offset = self.offset;
let (raw_with_header, result) = self.write_all(raw_with_header, ctx).await;
(raw_with_header, result.map(|_| offset))
}
}
impl BlobWriter<true> {

View File

@@ -564,9 +564,8 @@ mod tests {
Lsn(0),
Lsn(0),
Lsn(0),
// Updating this version to 17 will cause the test to fail at the
// next assert_eq!().
16,
// Any version will do here, so use the default
crate::DEFAULT_PG_VERSION,
);
let expected_bytes = vec![
/* TimelineMetadataHeader */

View File

@@ -52,9 +52,7 @@ use crate::tenant::config::{
use crate::tenant::span::debug_assert_current_span_has_tenant_id;
use crate::tenant::storage_layer::inmemory_layer;
use crate::tenant::timeline::ShutdownMode;
use crate::tenant::{
AttachedTenantConf, GcError, LoadConfigError, SpawnMode, TenantShard, TenantState,
};
use crate::tenant::{AttachedTenantConf, GcError, LoadConfigError, SpawnMode, Tenant, TenantState};
use crate::virtual_file::MaybeFatalIo;
use crate::{InitializationOrder, TEMP_FILE_SUFFIX};
@@ -69,7 +67,7 @@ use crate::{InitializationOrder, TEMP_FILE_SUFFIX};
/// having a properly acquired generation (Secondary doesn't need a generation)
#[derive(Clone)]
pub(crate) enum TenantSlot {
Attached(Arc<TenantShard>),
Attached(Arc<Tenant>),
Secondary(Arc<SecondaryTenant>),
/// In this state, other administrative operations acting on the TenantId should
/// block, or return a retry indicator equivalent to HTTP 503.
@@ -88,7 +86,7 @@ impl std::fmt::Debug for TenantSlot {
impl TenantSlot {
/// Return the `Tenant` in this slot if attached, else None
fn get_attached(&self) -> Option<&Arc<TenantShard>> {
fn get_attached(&self) -> Option<&Arc<Tenant>> {
match self {
Self::Attached(t) => Some(t),
Self::Secondary(_) => None,
@@ -166,7 +164,7 @@ impl TenantStartupMode {
/// Result type for looking up a TenantId to a specific shard
pub(crate) enum ShardResolveResult {
NotFound,
Found(Arc<TenantShard>),
Found(Arc<Tenant>),
// Wait for this barrrier, then query again
InProgress(utils::completion::Barrier),
}
@@ -175,7 +173,7 @@ impl TenantsMap {
/// Convenience function for typical usage, where we want to get a `Tenant` object, for
/// working with attached tenants. If the TenantId is in the map but in Secondary state,
/// None is returned.
pub(crate) fn get(&self, tenant_shard_id: &TenantShardId) -> Option<&Arc<TenantShard>> {
pub(crate) fn get(&self, tenant_shard_id: &TenantShardId) -> Option<&Arc<Tenant>> {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
@@ -412,7 +410,7 @@ fn load_tenant_config(
return None;
}
Some(TenantShard::load_tenant_config(conf, &tenant_shard_id))
Some(Tenant::load_tenant_config(conf, &tenant_shard_id))
}
/// Initial stage of load: walk the local tenants directory, clean up any temp files,
@@ -608,8 +606,7 @@ pub async fn init_tenant_mgr(
// Presence of a generation number implies attachment: attach the tenant
// if it wasn't already, and apply the generation number.
config_write_futs.push(async move {
let r =
TenantShard::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await;
let r = Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await;
(tenant_shard_id, location_conf, r)
});
}
@@ -697,7 +694,7 @@ fn tenant_spawn(
init_order: Option<InitializationOrder>,
mode: SpawnMode,
ctx: &RequestContext,
) -> Result<Arc<TenantShard>, GlobalShutDown> {
) -> Result<Arc<Tenant>, GlobalShutDown> {
// All these conditions should have been satisfied by our caller: the tenant dir exists, is a well formed
// path, and contains a configuration file. Assertions that do synchronous I/O are limited to debug mode
// to avoid impacting prod runtime performance.
@@ -709,7 +706,7 @@ fn tenant_spawn(
.unwrap()
);
TenantShard::spawn(
Tenant::spawn(
conf,
tenant_shard_id,
resources,
@@ -886,12 +883,12 @@ impl TenantManager {
/// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or currently
/// undergoing a state change (i.e. slot is InProgress).
///
/// The return TenantShard is not guaranteed to be active: check its status after obtaing it, or
/// use [`TenantShard::wait_to_become_active`] before using it if you will do I/O on it.
/// The return Tenant is not guaranteed to be active: check its status after obtaing it, or
/// use [`Tenant::wait_to_become_active`] before using it if you will do I/O on it.
pub(crate) fn get_attached_tenant_shard(
&self,
tenant_shard_id: TenantShardId,
) -> Result<Arc<TenantShard>, GetTenantError> {
) -> Result<Arc<Tenant>, GetTenantError> {
let locked = self.tenants.read().unwrap();
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
@@ -940,12 +937,12 @@ impl TenantManager {
flush: Option<Duration>,
mut spawn_mode: SpawnMode,
ctx: &RequestContext,
) -> Result<Option<Arc<TenantShard>>, UpsertLocationError> {
) -> Result<Option<Arc<Tenant>>, UpsertLocationError> {
debug_assert_current_span_has_tenant_id();
info!("configuring tenant location to state {new_location_config:?}");
enum FastPathModified {
Attached(Arc<TenantShard>),
Attached(Arc<Tenant>),
Secondary(Arc<SecondaryTenant>),
}
@@ -1002,13 +999,9 @@ impl TenantManager {
// phase of writing config and/or waiting for flush, before returning.
match fast_path_taken {
Some(FastPathModified::Attached(tenant)) => {
TenantShard::persist_tenant_config(
self.conf,
&tenant_shard_id,
&new_location_config,
)
.await
.fatal_err("write tenant shard config");
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
.await
.fatal_err("write tenant shard config");
// Transition to AttachedStale means we may well hold a valid generation
// still, and have been requested to go stale as part of a migration. If
@@ -1037,13 +1030,9 @@ impl TenantManager {
return Ok(Some(tenant));
}
Some(FastPathModified::Secondary(_secondary_tenant)) => {
TenantShard::persist_tenant_config(
self.conf,
&tenant_shard_id,
&new_location_config,
)
.await
.fatal_err("write tenant shard config");
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
.await
.fatal_err("write tenant shard config");
return Ok(None);
}
@@ -1133,7 +1122,7 @@ impl TenantManager {
// Before activating either secondary or attached mode, persist the
// configuration, so that on restart we will re-attach (or re-start
// secondary) on the tenant.
TenantShard::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
.await
.fatal_err("write tenant shard config");
@@ -1273,7 +1262,7 @@ impl TenantManager {
let tenant_path = self.conf.tenant_path(&tenant_shard_id);
let timelines_path = self.conf.timelines_path(&tenant_shard_id);
let config = TenantShard::load_tenant_config(self.conf, &tenant_shard_id)?;
let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
if drop_cache {
tracing::info!("Dropping local file cache");
@@ -1308,7 +1297,7 @@ impl TenantManager {
Ok(())
}
pub(crate) fn get_attached_active_tenant_shards(&self) -> Vec<Arc<TenantShard>> {
pub(crate) fn get_attached_active_tenant_shards(&self) -> Vec<Arc<Tenant>> {
let locked = self.tenants.read().unwrap();
match &*locked {
TenantsMap::Initializing => Vec::new(),
@@ -1457,7 +1446,7 @@ impl TenantManager {
#[instrument(skip_all, fields(tenant_id=%tenant.get_tenant_shard_id().tenant_id, shard_id=%tenant.get_tenant_shard_id().shard_slug(), new_shard_count=%new_shard_count.literal()))]
pub(crate) async fn shard_split(
&self,
tenant: Arc<TenantShard>,
tenant: Arc<Tenant>,
new_shard_count: ShardCount,
new_stripe_size: Option<ShardStripeSize>,
ctx: &RequestContext,
@@ -1487,7 +1476,7 @@ impl TenantManager {
pub(crate) async fn do_shard_split(
&self,
tenant: Arc<TenantShard>,
tenant: Arc<Tenant>,
new_shard_count: ShardCount,
new_stripe_size: Option<ShardStripeSize>,
ctx: &RequestContext,
@@ -1714,7 +1703,7 @@ impl TenantManager {
/// For each resident layer in the parent shard, we will hard link it into all of the child shards.
async fn shard_split_hardlink(
&self,
parent_shard: &TenantShard,
parent_shard: &Tenant,
child_shards: Vec<TenantShardId>,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
@@ -1999,7 +1988,7 @@ impl TenantManager {
}
let tenant_path = self.conf.tenant_path(&tenant_shard_id);
let config = TenantShard::load_tenant_config(self.conf, &tenant_shard_id)
let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)
.map_err(|e| Error::DetachReparent(e.into()))?;
let shard_identity = config.shard;

View File

@@ -133,7 +133,7 @@
//! - Initiate upload queue with that [`IndexPart`].
//! - Reschedule all lost operations by comparing the local filesystem state
//! and remote state as per [`IndexPart`]. This is done in
//! [`TenantShard::timeline_init_and_sync`].
//! [`Tenant::timeline_init_and_sync`].
//!
//! Note that if we crash during file deletion between the index update
//! that removes the file from the list of files, and deleting the remote file,
@@ -171,7 +171,7 @@
//! If no remote storage configuration is provided, the [`RemoteTimelineClient`] is
//! not created and the uploads are skipped.
//!
//! [`TenantShard::timeline_init_and_sync`]: super::TenantShard::timeline_init_and_sync
//! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
//! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
pub(crate) mod download;
@@ -2743,7 +2743,7 @@ mod tests {
use crate::tenant::config::AttachmentMode;
use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
use crate::tenant::storage_layer::layer::local_layer_path;
use crate::tenant::{TenantShard, Timeline};
use crate::tenant::{Tenant, Timeline};
pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
format!("contents for {name}").into()
@@ -2796,7 +2796,7 @@ mod tests {
struct TestSetup {
harness: TenantHarness,
tenant: Arc<TenantShard>,
tenant: Arc<Tenant>,
timeline: Arc<Timeline>,
tenant_ctx: RequestContext,
}

View File

@@ -452,7 +452,7 @@ async fn do_download_index_part(
/// generation (normal case when migrating/restarting). Only if both of these return 404 do we fall back
/// to listing objects.
///
/// * `my_generation`: the value of `[crate::tenant::TenantShard::generation]`
/// * `my_generation`: the value of `[crate::tenant::Tenant::generation]`
/// * `what`: for logging, what object are we downloading
/// * `prefix`: when listing objects, use this prefix (i.e. the part of the object path before the generation)
/// * `do_download`: a GET of the object in a particular generation, which should **retry indefinitely** unless

View File

@@ -21,7 +21,7 @@ use super::scheduler::{
use super::{CommandRequest, SecondaryTenantError, UploadCommand};
use crate::TEMP_FILE_SUFFIX;
use crate::metrics::SECONDARY_MODE;
use crate::tenant::TenantShard;
use crate::tenant::Tenant;
use crate::tenant::config::AttachmentMode;
use crate::tenant::mgr::{GetTenantError, TenantManager};
use crate::tenant::remote_timeline_client::remote_heatmap_path;
@@ -74,7 +74,7 @@ impl RunningJob for WriteInProgress {
}
struct UploadPending {
tenant: Arc<TenantShard>,
tenant: Arc<Tenant>,
last_upload: Option<LastUploadState>,
target_time: Option<Instant>,
period: Option<Duration>,
@@ -106,7 +106,7 @@ impl scheduler::Completion for WriteComplete {
struct UploaderTenantState {
// This Weak only exists to enable culling idle instances of this type
// when the Tenant has been deallocated.
tenant: Weak<TenantShard>,
tenant: Weak<Tenant>,
/// Digest of the serialized heatmap that we last successfully uploaded
last_upload_state: Option<LastUploadState>,
@@ -357,7 +357,7 @@ struct LastUploadState {
/// of the object we would have uploaded.
async fn upload_tenant_heatmap(
remote_storage: GenericRemoteStorage,
tenant: &Arc<TenantShard>,
tenant: &Arc<Tenant>,
last_upload: Option<LastUploadState>,
) -> Result<UploadHeatmapOutcome, UploadHeatmapError> {
debug_assert_current_span_has_tenant_id();

View File

@@ -360,7 +360,7 @@ where
/// Periodic execution phase: inspect all attached tenants and schedule any work they require.
///
/// The type in `tenants` should be a tenant-like structure, e.g. [`crate::tenant::TenantShard`] or [`crate::tenant::secondary::SecondaryTenant`]
/// The type in `tenants` should be a tenant-like structure, e.g. [`crate::tenant::Tenant`] or [`crate::tenant::secondary::SecondaryTenant`]
///
/// This function resets the pending list: it is assumed that the caller may change their mind about
/// which tenants need work between calls to schedule_iteration.

View File

@@ -12,7 +12,7 @@ use tracing::*;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use super::{GcError, LogicalSizeCalculationCause, TenantShard};
use super::{GcError, LogicalSizeCalculationCause, Tenant};
use crate::context::RequestContext;
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
use crate::tenant::{MaybeOffloaded, Timeline};
@@ -156,7 +156,7 @@ pub struct TimelineInputs {
/// initdb_lsn branchpoints* next_pitr_cutoff latest
/// ```
pub(super) async fn gather_inputs(
tenant: &TenantShard,
tenant: &Tenant,
limit: &Arc<Semaphore>,
max_retention_period: Option<u64>,
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,

View File

@@ -1620,7 +1620,7 @@ pub(crate) mod test {
use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
use crate::tenant::storage_layer::{Layer, ResidentLayer};
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
use crate::tenant::{TenantShard, Timeline};
use crate::tenant::{Tenant, Timeline};
/// Construct an index for a fictional delta layer and and then
/// traverse in order to plan vectored reads for a query. Finally,
@@ -2209,7 +2209,7 @@ pub(crate) mod test {
}
pub(crate) async fn produce_delta_layer(
tenant: &TenantShard,
tenant: &Tenant,
tline: &Arc<Timeline>,
mut deltas: Vec<(Key, Lsn, Value)>,
ctx: &RequestContext,

View File

@@ -559,12 +559,11 @@ impl ImageLayerInner {
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
// Just read the raw header+data and pass it through to the target layer, without
// decoding and recompressing it.
let raw = meta.raw_with_header(&view);
let img_buf = meta.read(&view).await?;
key_count += 1;
writer
.put_image_raw(meta.meta.key, raw.into_bytes(), ctx)
.put_image(meta.meta.key, img_buf.into_bytes(), ctx)
.await
.context(format!("Storing key {}", meta.meta.key))?;
}
@@ -854,41 +853,6 @@ impl ImageLayerWriterInner {
Ok(())
}
///
/// Write the next image to the file, as a raw blob header and data.
///
/// The page versions must be appended in blknum order.
///
async fn put_image_raw(
&mut self,
key: Key,
raw_with_header: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
// NB: we don't update the (un)compressed metrics, since we can't determine them without
// decompressing the image. This seems okay.
self.num_keys += 1;
let (_, res) = self
.blob_writer
.write_blob_raw(raw_with_header.slice_len(), ctx)
.await;
let offset = res?;
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
self.tree.append(&keybuf, offset)?;
#[cfg(feature = "testing")]
{
self.last_written_key = key;
}
Ok(())
}
///
/// Finish writing the image layer.
///
@@ -924,13 +888,7 @@ impl ImageLayerWriterInner {
crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED
.inc_by(self.uncompressed_bytes_eligible);
crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
// NB: filter() may pass through raw pages from a different layer, without looking at
// whether these are compressed or not. We don't track metrics for these, so avoid
// increasing `COMPRESSION_IMAGE_OUTPUT_BYTES` in this case too.
if self.uncompressed_bytes > 0 {
crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
};
crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
let mut file = self.blob_writer.into_inner();
@@ -1076,25 +1034,6 @@ impl ImageLayerWriter {
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
}
///
/// Write the next value to the file, as a raw header and data. This allows passing through a
/// raw, potentially compressed image from a different layer file without recompressing it.
///
/// The page versions must be appended in blknum order.
///
pub async fn put_image_raw(
&mut self,
key: Key,
raw_with_header: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
self.inner
.as_mut()
.unwrap()
.put_image_raw(key, raw_with_header, ctx)
.await
}
/// Estimated size of the image layer.
pub(crate) fn estimated_size(&self) -> u64 {
let inner = self.inner.as_ref().unwrap();
@@ -1228,7 +1167,7 @@ mod test {
use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
use crate::tenant::storage_layer::{Layer, ResidentLayer};
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
use crate::tenant::{TenantShard, Timeline};
use crate::tenant::{Tenant, Timeline};
#[tokio::test]
async fn image_layer_rewrite() {
@@ -1410,7 +1349,7 @@ mod test {
}
async fn produce_image_layer(
tenant: &TenantShard,
tenant: &Tenant,
tline: &Arc<Timeline>,
mut images: Vec<(Key, Bytes)>,
lsn: Lsn,

View File

@@ -24,7 +24,7 @@ use crate::task_mgr::{self, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS, TaskKind};
use crate::tenant::throttle::Stats;
use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::compaction::CompactionOutcome;
use crate::tenant::{TenantShard, TenantState};
use crate::tenant::{Tenant, TenantState};
/// Semaphore limiting concurrent background tasks (across all tenants).
///
@@ -117,7 +117,7 @@ pub(crate) async fn acquire_concurrency_permit(
}
/// Start per tenant background loops: compaction, GC, and ingest housekeeping.
pub fn start_background_loops(tenant: &Arc<TenantShard>, can_start: Option<&Barrier>) {
pub fn start_background_loops(tenant: &Arc<Tenant>, can_start: Option<&Barrier>) {
let tenant_shard_id = tenant.tenant_shard_id;
task_mgr::spawn(
@@ -198,7 +198,7 @@ pub fn start_background_loops(tenant: &Arc<TenantShard>, can_start: Option<&Barr
}
/// Compaction task's main loop.
async fn compaction_loop(tenant: Arc<TenantShard>, cancel: CancellationToken) {
async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
const BASE_BACKOFF_SECS: f64 = 1.0;
const MAX_BACKOFF_SECS: f64 = 300.0;
const RECHECK_CONFIG_INTERVAL: Duration = Duration::from_secs(10);
@@ -348,7 +348,7 @@ pub(crate) fn log_compaction_error(
}
/// GC task's main loop.
async fn gc_loop(tenant: Arc<TenantShard>, cancel: CancellationToken) {
async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
const MAX_BACKOFF_SECS: f64 = 300.0;
let mut error_run = 0; // consecutive errors
@@ -432,7 +432,7 @@ async fn gc_loop(tenant: Arc<TenantShard>, cancel: CancellationToken) {
}
/// Tenant housekeeping's main loop.
async fn tenant_housekeeping_loop(tenant: Arc<TenantShard>, cancel: CancellationToken) {
async fn tenant_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
let mut last_throttle_flag_reset_at = Instant::now();
loop {
if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
@@ -483,7 +483,7 @@ async fn tenant_housekeeping_loop(tenant: Arc<TenantShard>, cancel: Cancellation
/// Waits until the tenant becomes active, or returns `ControlFlow::Break()` to shut down.
async fn wait_for_active_tenant(
tenant: &Arc<TenantShard>,
tenant: &Arc<Tenant>,
cancel: &CancellationToken,
) -> ControlFlow<()> {
if tenant.current_state() == TenantState::Active {

View File

@@ -412,7 +412,7 @@ pub struct Timeline {
/// Timeline deletion will acquire both compaction and gc locks in whatever order.
gc_lock: tokio::sync::Mutex<()>,
/// Cloned from [`super::TenantShard::pagestream_throttle`] on construction.
/// Cloned from [`super::Tenant::pagestream_throttle`] on construction.
pub(crate) pagestream_throttle: Arc<crate::tenant::throttle::Throttle>,
/// Size estimator for aux file v2
@@ -1285,10 +1285,6 @@ impl Timeline {
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
if query.is_empty() {
return Ok(BTreeMap::default());
}
let read_path = if self.conf.enable_read_path_debugging || ctx.read_path_debug() {
Some(ReadPath::new(
query.total_keyspace(),
@@ -2069,7 +2065,7 @@ impl Timeline {
pub(crate) fn activate(
self: &Arc<Self>,
parent: Arc<crate::tenant::TenantShard>,
parent: Arc<crate::tenant::Tenant>,
broker_client: BrokerClientChannel,
background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext,
@@ -2494,12 +2490,11 @@ impl Timeline {
tenant_conf.is_gc_blocked_by_lsn_lease_deadline()
}
pub(crate) fn get_lazy_slru_download(&self) -> bool {
pub(crate) fn get_lazy_slru_download(&self, lazy_slru_download_enabled_by_cp: bool) -> bool {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.lazy_slru_download
.unwrap_or(self.conf.default_tenant_conf.lazy_slru_download)
tenant_conf.tenant_conf.lazy_slru_download.unwrap_or(
lazy_slru_download_enabled_by_cp || self.conf.default_tenant_conf.lazy_slru_download,
)
}
/// Checks if a get page request should get perf tracing
@@ -2706,14 +2701,6 @@ impl Timeline {
.clone()
}
pub fn get_compaction_shard_ancestor(&self) -> bool {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.compaction_shard_ancestor
.unwrap_or(self.conf.default_tenant_conf.compaction_shard_ancestor)
}
fn get_eviction_policy(&self) -> EvictionPolicy {
let tenant_conf = self.tenant_conf.load();
tenant_conf
@@ -3329,7 +3316,7 @@ impl Timeline {
// (1) and (4)
// TODO: this is basically a no-op now, should we remove it?
self.remote_client.schedule_barrier()?;
// TenantShard::create_timeline will wait for these uploads to happen before returning, or
// Tenant::create_timeline will wait for these uploads to happen before returning, or
// on retry.
// Now that we have the full layer map, we may calculate the visibility of layers within it (a global scan)
@@ -5758,7 +5745,7 @@ impl Timeline {
/// from our ancestor to be branches of this timeline.
pub(crate) async fn prepare_to_detach_from_ancestor(
self: &Arc<Timeline>,
tenant: &crate::tenant::TenantShard,
tenant: &crate::tenant::Tenant,
options: detach_ancestor::Options,
behavior: DetachBehavior,
ctx: &RequestContext,
@@ -5777,7 +5764,7 @@ impl Timeline {
/// resetting the tenant.
pub(crate) async fn detach_from_ancestor_and_reparent(
self: &Arc<Timeline>,
tenant: &crate::tenant::TenantShard,
tenant: &crate::tenant::Tenant,
prepared: detach_ancestor::PreparedTimelineDetach,
ancestor_timeline_id: TimelineId,
ancestor_lsn: Lsn,
@@ -5801,7 +5788,7 @@ impl Timeline {
/// The tenant must've been reset if ancestry was modified previously (in tenant manager).
pub(crate) async fn complete_detaching_timeline_ancestor(
self: &Arc<Timeline>,
tenant: &crate::tenant::TenantShard,
tenant: &crate::tenant::Tenant,
attempt: detach_ancestor::Attempt,
ctx: &RequestContext,
) -> Result<(), detach_ancestor::Error> {
@@ -6863,14 +6850,14 @@ impl Timeline {
/// Persistently blocks gc for `Manual` reason.
///
/// Returns true if no such block existed before, false otherwise.
pub(crate) async fn block_gc(&self, tenant: &super::TenantShard) -> anyhow::Result<bool> {
pub(crate) async fn block_gc(&self, tenant: &super::Tenant) -> anyhow::Result<bool> {
use crate::tenant::remote_timeline_client::index::GcBlockingReason;
assert_eq!(self.tenant_shard_id, tenant.tenant_shard_id);
tenant.gc_block.insert(self, GcBlockingReason::Manual).await
}
/// Persistently unblocks gc for `Manual` reason.
pub(crate) async fn unblock_gc(&self, tenant: &super::TenantShard) -> anyhow::Result<()> {
pub(crate) async fn unblock_gc(&self, tenant: &super::Tenant) -> anyhow::Result<()> {
use crate::tenant::remote_timeline_client::index::GcBlockingReason;
assert_eq!(self.tenant_shard_id, tenant.tenant_shard_id);
tenant.gc_block.remove(self, GcBlockingReason::Manual).await
@@ -6888,8 +6875,8 @@ impl Timeline {
/// Force create an image layer and place it into the layer map.
///
/// DO NOT use this function directly. Use [`TenantShard::branch_timeline_test_with_layers`]
/// or [`TenantShard::create_test_timeline_with_layers`] to ensure all these layers are
/// DO NOT use this function directly. Use [`Tenant::branch_timeline_test_with_layers`]
/// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are
/// placed into the layer map in one run AND be validated.
#[cfg(test)]
pub(super) async fn force_create_image_layer(
@@ -6945,8 +6932,8 @@ impl Timeline {
/// Force create a delta layer and place it into the layer map.
///
/// DO NOT use this function directly. Use [`TenantShard::branch_timeline_test_with_layers`]
/// or [`TenantShard::create_test_timeline_with_layers`] to ensure all these layers are
/// DO NOT use this function directly. Use [`Tenant::branch_timeline_test_with_layers`]
/// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are
/// placed into the layer map in one run AND be validated.
#[cfg(test)]
pub(super) async fn force_create_delta_layer(

View File

@@ -77,7 +77,7 @@ const COMPACTION_DELTA_THRESHOLD: usize = 5;
/// shard split, which gets expensive for large tenants.
const ANCESTOR_COMPACTION_REWRITE_THRESHOLD: f64 = 0.3;
#[derive(Default, Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize)]
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub struct GcCompactionJobId(pub usize);
impl std::fmt::Display for GcCompactionJobId {
@@ -105,50 +105,6 @@ pub enum GcCompactionQueueItem {
Notify(GcCompactionJobId, Option<Lsn>),
}
/// Statistics for gc-compaction meta jobs, which contains several sub compaction jobs.
#[derive(Debug, Clone, Serialize, Default)]
pub struct GcCompactionMetaStatistics {
/// The total number of sub compaction jobs.
pub total_sub_compaction_jobs: usize,
/// The total number of sub compaction jobs that failed.
pub failed_sub_compaction_jobs: usize,
/// The total number of sub compaction jobs that succeeded.
pub succeeded_sub_compaction_jobs: usize,
/// The layer size before compaction.
pub before_compaction_layer_size: u64,
/// The layer size after compaction.
pub after_compaction_layer_size: u64,
/// The start time of the meta job.
pub start_time: Option<chrono::DateTime<chrono::Utc>>,
/// The end time of the meta job.
pub end_time: Option<chrono::DateTime<chrono::Utc>>,
/// The duration of the meta job.
pub duration_secs: f64,
/// The id of the meta job.
pub meta_job_id: GcCompactionJobId,
/// The LSN below which the layers are compacted, used to compute the statistics.
pub below_lsn: Lsn,
/// The retention ratio of the meta job (after_compaction_layer_size / before_compaction_layer_size)
pub retention_ratio: f64,
}
impl GcCompactionMetaStatistics {
fn finalize(&mut self) {
let end_time = chrono::Utc::now();
if let Some(start_time) = self.start_time {
if end_time > start_time {
let delta = end_time - start_time;
if let Ok(std_dur) = delta.to_std() {
self.duration_secs = std_dur.as_secs_f64();
}
}
}
self.retention_ratio = self.after_compaction_layer_size as f64
/ (self.before_compaction_layer_size as f64 + 1.0);
self.end_time = Some(end_time);
}
}
impl GcCompactionQueueItem {
pub fn into_compact_info_resp(
self,
@@ -186,7 +142,6 @@ struct GcCompactionQueueInner {
queued: VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
guards: HashMap<GcCompactionJobId, GcCompactionGuardItems>,
last_id: GcCompactionJobId,
meta_statistics: Option<GcCompactionMetaStatistics>,
}
impl GcCompactionQueueInner {
@@ -218,7 +173,6 @@ impl GcCompactionQueue {
queued: VecDeque::new(),
guards: HashMap::new(),
last_id: GcCompactionJobId(0),
meta_statistics: None,
}),
consumer_lock: tokio::sync::Mutex::new(()),
}
@@ -403,23 +357,6 @@ impl GcCompactionQueue {
Ok(())
}
async fn collect_layer_below_lsn(
&self,
timeline: &Arc<Timeline>,
lsn: Lsn,
) -> Result<u64, CompactionError> {
let guard = timeline.layers.read().await;
let layer_map = guard.layer_map()?;
let layers = layer_map.iter_historic_layers().collect_vec();
let mut size = 0;
for layer in layers {
if layer.lsn_range.start <= lsn {
size += layer.file_size();
}
}
Ok(size)
}
/// Notify the caller the job has finished and unblock GC.
fn notify_and_unblock(&self, id: GcCompactionJobId) {
info!("compaction job id={} finished", id);
@@ -429,16 +366,6 @@ impl GcCompactionQueue {
let _ = tx.send(());
}
}
if let Some(ref meta_statistics) = guard.meta_statistics {
if meta_statistics.meta_job_id == id {
if let Ok(stats) = serde_json::to_string(&meta_statistics) {
info!(
"gc-compaction meta statistics for job id = {}: {}",
id, stats
);
}
}
}
}
fn clear_running_job(&self) {
@@ -478,11 +405,7 @@ impl GcCompactionQueue {
let mut pending_tasks = Vec::new();
// gc-compaction might pick more layers or fewer layers to compact. The L2 LSN does not need to be accurate.
// And therefore, we simply assume the maximum LSN of all jobs is the expected L2 LSN.
let expected_l2_lsn = jobs
.iter()
.map(|job| job.compact_lsn_range.end)
.max()
.unwrap();
let expected_l2_lsn = jobs.iter().map(|job| job.compact_lsn_range.end).max();
for job in jobs {
// Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
// until we do further refactors to allow directly call `compact_with_gc`.
@@ -507,13 +430,9 @@ impl GcCompactionQueue {
if !auto {
pending_tasks.push(GcCompactionQueueItem::Notify(id, None));
} else {
pending_tasks.push(GcCompactionQueueItem::Notify(id, Some(expected_l2_lsn)));
pending_tasks.push(GcCompactionQueueItem::Notify(id, expected_l2_lsn));
}
let layer_size = self
.collect_layer_below_lsn(timeline, expected_l2_lsn)
.await?;
{
let mut guard = self.inner.lock().unwrap();
let mut tasks = Vec::new();
@@ -525,16 +444,7 @@ impl GcCompactionQueue {
for item in tasks {
guard.queued.push_front(item);
}
guard.meta_statistics = Some(GcCompactionMetaStatistics {
meta_job_id: id,
start_time: Some(chrono::Utc::now()),
before_compaction_layer_size: layer_size,
below_lsn: expected_l2_lsn,
total_sub_compaction_jobs: jobs_len,
..Default::default()
});
}
info!(
"scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs",
jobs_len
@@ -663,10 +573,6 @@ impl GcCompactionQueue {
Err(err) => {
warn!(%err, "failed to run gc-compaction subcompaction job");
self.clear_running_job();
let mut guard = self.inner.lock().unwrap();
if let Some(ref mut meta_statistics) = guard.meta_statistics {
meta_statistics.failed_sub_compaction_jobs += 1;
}
return Err(err);
}
};
@@ -676,34 +582,8 @@ impl GcCompactionQueue {
// we need to clean things up before returning from the function.
yield_for_l0 = true;
}
{
let mut guard = self.inner.lock().unwrap();
if let Some(ref mut meta_statistics) = guard.meta_statistics {
meta_statistics.succeeded_sub_compaction_jobs += 1;
}
}
}
GcCompactionQueueItem::Notify(id, l2_lsn) => {
let below_lsn = {
let mut guard = self.inner.lock().unwrap();
if let Some(ref mut meta_statistics) = guard.meta_statistics {
meta_statistics.below_lsn
} else {
Lsn::INVALID
}
};
let layer_size = if below_lsn != Lsn::INVALID {
self.collect_layer_below_lsn(timeline, below_lsn).await?
} else {
0
};
{
let mut guard = self.inner.lock().unwrap();
if let Some(ref mut meta_statistics) = guard.meta_statistics {
meta_statistics.after_compaction_layer_size = layer_size;
meta_statistics.finalize();
}
}
self.notify_and_unblock(id);
if let Some(l2_lsn) = l2_lsn {
let current_l2_lsn = timeline
@@ -1359,7 +1239,8 @@ impl Timeline {
let partition_count = self.partitioning.read().0.0.parts.len();
// 4. Shard ancestor compaction
if self.get_compaction_shard_ancestor() && self.shard_identity.count >= ShardCount::new(2) {
if self.shard_identity.count >= ShardCount::new(2) {
// Limit the number of layer rewrites to the number of partitions: this means its
// runtime should be comparable to a full round of image layer creations, rather than
// being potentially much longer.

View File

@@ -18,8 +18,8 @@ use crate::tenant::remote_timeline_client::{
PersistIndexPartWithDeletedFlagError, RemoteTimelineClient,
};
use crate::tenant::{
CreateTimelineCause, DeleteTimelineError, MaybeDeletedIndexPart, TenantManifestError,
TenantShard, Timeline, TimelineOrOffloaded,
CreateTimelineCause, DeleteTimelineError, MaybeDeletedIndexPart, Tenant, TenantManifestError,
Timeline, TimelineOrOffloaded,
};
use crate::virtual_file::MaybeFatalIo;
@@ -113,7 +113,7 @@ pub(super) async fn delete_local_timeline_directory(
/// It is important that this gets called when DeletionGuard is being held.
/// For more context see comments in [`make_timeline_delete_guard`]
async fn remove_maybe_offloaded_timeline_from_tenant(
tenant: &TenantShard,
tenant: &Tenant,
timeline: &TimelineOrOffloaded,
_: &DeletionGuard, // using it as a witness
) -> anyhow::Result<()> {
@@ -192,7 +192,7 @@ impl DeleteTimelineFlow {
// error out if some of the shutdown tasks have already been completed!
#[instrument(skip_all)]
pub async fn run(
tenant: &Arc<TenantShard>,
tenant: &Arc<Tenant>,
timeline_id: TimelineId,
) -> Result<(), DeleteTimelineError> {
super::debug_assert_current_span_has_tenant_and_timeline_id();
@@ -288,7 +288,7 @@ impl DeleteTimelineFlow {
/// Shortcut to create Timeline in stopping state and spawn deletion task.
#[instrument(skip_all, fields(%timeline_id))]
pub(crate) async fn resume_deletion(
tenant: Arc<TenantShard>,
tenant: Arc<Tenant>,
timeline_id: TimelineId,
local_metadata: &TimelineMetadata,
remote_client: RemoteTimelineClient,
@@ -338,7 +338,7 @@ impl DeleteTimelineFlow {
fn schedule_background(
guard: DeletionGuard,
conf: &'static PageServerConf,
tenant: Arc<TenantShard>,
tenant: Arc<Tenant>,
timeline: TimelineOrOffloaded,
remote_client: Arc<RemoteTimelineClient>,
) {
@@ -381,7 +381,7 @@ impl DeleteTimelineFlow {
async fn background(
mut guard: DeletionGuard,
conf: &PageServerConf,
tenant: &TenantShard,
tenant: &Tenant,
timeline: &TimelineOrOffloaded,
remote_client: Arc<RemoteTimelineClient>,
) -> Result<(), DeleteTimelineError> {
@@ -435,7 +435,7 @@ pub(super) enum TimelineDeleteGuardKind {
}
pub(super) fn make_timeline_delete_guard(
tenant: &TenantShard,
tenant: &Tenant,
timeline_id: TimelineId,
guard_kind: TimelineDeleteGuardKind,
) -> Result<(TimelineOrOffloaded, DeletionGuard), DeleteTimelineError> {

View File

@@ -23,7 +23,7 @@ use super::layer_manager::LayerManager;
use super::{FlushLayerError, Timeline};
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::TaskKind;
use crate::tenant::TenantShard;
use crate::tenant::Tenant;
use crate::tenant::remote_timeline_client::index::GcBlockingReason::DetachAncestor;
use crate::tenant::storage_layer::layer::local_layer_path;
use crate::tenant::storage_layer::{
@@ -265,7 +265,7 @@ async fn generate_tombstone_image_layer(
/// See [`Timeline::prepare_to_detach_from_ancestor`]
pub(super) async fn prepare(
detached: &Arc<Timeline>,
tenant: &TenantShard,
tenant: &Tenant,
behavior: DetachBehavior,
options: Options,
ctx: &RequestContext,
@@ -590,7 +590,7 @@ pub(super) async fn prepare(
async fn start_new_attempt(
detached: &Timeline,
tenant: &TenantShard,
tenant: &Tenant,
ancestor_timeline_id: TimelineId,
ancestor_lsn: Lsn,
) -> Result<Attempt, Error> {
@@ -611,7 +611,7 @@ async fn start_new_attempt(
async fn continue_with_blocked_gc(
detached: &Timeline,
tenant: &TenantShard,
tenant: &Tenant,
ancestor_timeline_id: TimelineId,
ancestor_lsn: Lsn,
) -> Result<Attempt, Error> {
@@ -622,7 +622,7 @@ async fn continue_with_blocked_gc(
fn obtain_exclusive_attempt(
detached: &Timeline,
tenant: &TenantShard,
tenant: &Tenant,
ancestor_timeline_id: TimelineId,
ancestor_lsn: Lsn,
) -> Result<Attempt, Error> {
@@ -655,7 +655,7 @@ fn obtain_exclusive_attempt(
fn reparented_direct_children(
detached: &Arc<Timeline>,
tenant: &TenantShard,
tenant: &Tenant,
) -> Result<HashSet<TimelineId>, Error> {
let mut all_direct_children = tenant
.timelines
@@ -950,7 +950,7 @@ impl DetachingAndReparenting {
/// See [`Timeline::detach_from_ancestor_and_reparent`].
pub(super) async fn detach_and_reparent(
detached: &Arc<Timeline>,
tenant: &TenantShard,
tenant: &Tenant,
prepared: PreparedTimelineDetach,
ancestor_timeline_id: TimelineId,
ancestor_lsn: Lsn,
@@ -1184,7 +1184,7 @@ pub(super) async fn detach_and_reparent(
pub(super) async fn complete(
detached: &Arc<Timeline>,
tenant: &TenantShard,
tenant: &Tenant,
mut attempt: Attempt,
_ctx: &RequestContext,
) -> Result<(), Error> {
@@ -1258,7 +1258,7 @@ where
}
fn check_no_archived_children_of_ancestor(
tenant: &TenantShard,
tenant: &Tenant,
detached: &Arc<Timeline>,
ancestor: &Arc<Timeline>,
ancestor_lsn: Lsn,

View File

@@ -33,7 +33,7 @@ use crate::tenant::size::CalculateSyntheticSizeError;
use crate::tenant::storage_layer::LayerVisibilityHint;
use crate::tenant::tasks::{BackgroundLoopKind, BackgroundLoopSemaphorePermit, sleep_random};
use crate::tenant::timeline::EvictionError;
use crate::tenant::{LogicalSizeCalculationCause, TenantShard};
use crate::tenant::{LogicalSizeCalculationCause, Tenant};
#[derive(Default)]
pub struct EvictionTaskTimelineState {
@@ -48,7 +48,7 @@ pub struct EvictionTaskTenantState {
impl Timeline {
pub(super) fn launch_eviction_task(
self: &Arc<Self>,
parent: Arc<TenantShard>,
parent: Arc<Tenant>,
background_tasks_can_start: Option<&completion::Barrier>,
) {
let self_clone = Arc::clone(self);
@@ -75,7 +75,7 @@ impl Timeline {
}
#[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
async fn eviction_task(self: Arc<Self>, tenant: Arc<TenantShard>) {
async fn eviction_task(self: Arc<Self>, tenant: Arc<Tenant>) {
// acquire the gate guard only once within a useful span
let Ok(guard) = self.gate.enter() else {
return;
@@ -118,7 +118,7 @@ impl Timeline {
#[instrument(skip_all, fields(policy_kind = policy.discriminant_str()))]
async fn eviction_iteration(
self: &Arc<Self>,
tenant: &TenantShard,
tenant: &Tenant,
policy: &EvictionPolicy,
cancel: &CancellationToken,
gate: &GateGuard,
@@ -175,7 +175,7 @@ impl Timeline {
async fn eviction_iteration_threshold(
self: &Arc<Self>,
tenant: &TenantShard,
tenant: &Tenant,
p: &EvictionPolicyLayerAccessThreshold,
cancel: &CancellationToken,
gate: &GateGuard,
@@ -309,7 +309,7 @@ impl Timeline {
/// disk usage based eviction task.
async fn imitiate_only(
self: &Arc<Self>,
tenant: &TenantShard,
tenant: &Tenant,
p: &EvictionPolicyLayerAccessThreshold,
cancel: &CancellationToken,
gate: &GateGuard,
@@ -363,7 +363,7 @@ impl Timeline {
#[instrument(skip_all)]
async fn imitate_layer_accesses(
&self,
tenant: &TenantShard,
tenant: &Tenant,
p: &EvictionPolicyLayerAccessThreshold,
cancel: &CancellationToken,
gate: &GateGuard,
@@ -499,7 +499,7 @@ impl Timeline {
#[instrument(skip_all)]
async fn imitate_synthetic_size_calculation_worker(
&self,
tenant: &TenantShard,
tenant: &Tenant,
cancel: &CancellationToken,
ctx: &RequestContext,
) {

View File

@@ -1,6 +1,6 @@
//! FIXME: most of this is copy-paste from mgmt_api.rs ; dedupe into a `reqwest_utils::Client` crate.
use pageserver_client::mgmt_api::{Error, ResponseErrorMessageExt};
use reqwest::{Certificate, Method};
use reqwest::Method;
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tracing::error;
@@ -34,7 +34,7 @@ impl Client {
};
let mut http_client = reqwest::Client::builder();
for cert in &conf.ssl_ca_certs {
http_client = http_client.add_root_certificate(Certificate::from_der(cert.contents())?);
http_client = http_client.add_root_certificate(cert.clone());
}
let http_client = http_client.build()?;

View File

@@ -8,7 +8,7 @@ use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::remote_timeline_client::ShutdownIfArchivedError;
use crate::tenant::timeline::delete::{TimelineDeleteGuardKind, make_timeline_delete_guard};
use crate::tenant::{
DeleteTimelineError, OffloadedTimeline, TenantManifestError, TenantShard, TimelineOrOffloaded,
DeleteTimelineError, OffloadedTimeline, Tenant, TenantManifestError, TimelineOrOffloaded,
};
#[derive(thiserror::Error, Debug)]
@@ -33,7 +33,7 @@ impl From<TenantManifestError> for OffloadError {
}
pub(crate) async fn offload_timeline(
tenant: &TenantShard,
tenant: &Tenant,
timeline: &Arc<Timeline>,
) -> Result<(), OffloadError> {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -123,7 +123,7 @@ pub(crate) async fn offload_timeline(
///
/// Returns the strong count of the timeline `Arc`
fn remove_timeline_from_tenant(
tenant: &TenantShard,
tenant: &Tenant,
timeline: &Timeline,
_: &DeletionGuard, // using it as a witness
) -> usize {

View File

@@ -15,19 +15,17 @@ use super::Timeline;
use crate::context::RequestContext;
use crate::import_datadir;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::{
CreateTimelineError, CreateTimelineIdempotency, TenantShard, TimelineOrOffloaded,
};
use crate::tenant::{CreateTimelineError, CreateTimelineIdempotency, Tenant, TimelineOrOffloaded};
/// A timeline with some of its files on disk, being initialized.
/// This struct ensures the atomicity of the timeline init: it's either properly created and inserted into pageserver's memory, or
/// its local files are removed. If we crash while this class exists, then the timeline's local
/// state is cleaned up during [`TenantShard::clean_up_timelines`], because the timeline's content isn't in remote storage.
/// state is cleaned up during [`Tenant::clean_up_timelines`], because the timeline's content isn't in remote storage.
///
/// The caller is responsible for proper timeline data filling before the final init.
#[must_use]
pub struct UninitializedTimeline<'t> {
pub(crate) owning_tenant: &'t TenantShard,
pub(crate) owning_tenant: &'t Tenant,
timeline_id: TimelineId,
raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard)>,
/// Whether we spawned the inner Timeline's tasks such that we must later shut it down
@@ -37,7 +35,7 @@ pub struct UninitializedTimeline<'t> {
impl<'t> UninitializedTimeline<'t> {
pub(crate) fn new(
owning_tenant: &'t TenantShard,
owning_tenant: &'t Tenant,
timeline_id: TimelineId,
raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard)>,
) -> Self {
@@ -158,7 +156,7 @@ impl<'t> UninitializedTimeline<'t> {
/// Prepares timeline data by loading it from the basebackup archive.
pub(crate) async fn import_basebackup_from_tar(
mut self,
tenant: Arc<TenantShard>,
tenant: Arc<Tenant>,
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
base_lsn: Lsn,
broker_client: storage_broker::BrokerClientChannel,
@@ -229,17 +227,17 @@ pub(crate) fn cleanup_timeline_directory(create_guard: TimelineCreateGuard) {
error!("Failed to clean up uninitialized timeline directory {timeline_path:?}: {e:?}")
}
}
// Having cleaned up, we can release this TimelineId in `[TenantShard::timelines_creating]` to allow other
// Having cleaned up, we can release this TimelineId in `[Tenant::timelines_creating]` to allow other
// timeline creation attempts under this TimelineId to proceed
drop(create_guard);
}
/// A guard for timeline creations in process: as long as this object exists, the timeline ID
/// is kept in `[TenantShard::timelines_creating]` to exclude concurrent attempts to create the same timeline.
/// is kept in `[Tenant::timelines_creating]` to exclude concurrent attempts to create the same timeline.
#[must_use]
pub(crate) struct TimelineCreateGuard {
pub(crate) _tenant_gate_guard: GateGuard,
pub(crate) owning_tenant: Arc<TenantShard>,
pub(crate) owning_tenant: Arc<Tenant>,
pub(crate) timeline_id: TimelineId,
pub(crate) timeline_path: Utf8PathBuf,
pub(crate) idempotency: CreateTimelineIdempotency,
@@ -265,7 +263,7 @@ pub(crate) enum TimelineExclusionError {
impl TimelineCreateGuard {
pub(crate) fn new(
owning_tenant: &Arc<TenantShard>,
owning_tenant: &Arc<Tenant>,
timeline_id: TimelineId,
timeline_path: Utf8PathBuf,
idempotency: CreateTimelineIdempotency,

View File

@@ -26,7 +26,7 @@ use utils::lsn::Lsn;
use utils::vec_map::VecMap;
use crate::context::RequestContext;
use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, Header};
use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
use crate::virtual_file::{self, IoBufferMut, VirtualFile};
/// Metadata bundled with the start and end offset of a blob.
@@ -111,20 +111,18 @@ impl From<Bytes> for BufView<'_> {
pub struct VectoredBlob {
/// Blob metadata.
pub meta: BlobMeta,
/// Header start offset.
header_start: usize,
/// Data start offset.
data_start: usize,
/// Start offset.
start: usize,
/// End offset.
end: usize,
/// Compression used on the data, extracted from the header.
/// Compression used on the the blob.
compression_bits: u8,
}
impl VectoredBlob {
/// Reads a decompressed view of the blob.
pub(crate) async fn read<'a>(&self, buf: &BufView<'a>) -> Result<BufView<'a>, std::io::Error> {
let view = buf.view(self.data_start..self.end);
let view = buf.view(self.start..self.end);
match self.compression_bits {
BYTE_UNCOMPRESSED => Ok(view),
@@ -142,18 +140,13 @@ impl VectoredBlob {
std::io::ErrorKind::InvalidData,
format!(
"Failed to decompress blob for {}@{}, {}..{}: invalid compression byte {bits:x}",
self.meta.key, self.meta.lsn, self.data_start, self.end
self.meta.key, self.meta.lsn, self.start, self.end
),
);
Err(error)
}
}
}
/// Returns the raw blob including header.
pub(crate) fn raw_with_header<'a>(&self, buf: &BufView<'a>) -> BufView<'a> {
buf.view(self.header_start..self.end)
}
}
impl std::fmt::Display for VectoredBlob {
@@ -161,7 +154,7 @@ impl std::fmt::Display for VectoredBlob {
write!(
f,
"{}@{}, {}..{}",
self.meta.key, self.meta.lsn, self.data_start, self.end
self.meta.key, self.meta.lsn, self.start, self.end
)
}
}
@@ -500,28 +493,50 @@ impl<'a> VectoredBlobReader<'a> {
let blobs_at = read.blobs_at.as_slice();
let mut blobs = Vec::with_capacity(blobs_at.len());
let start_offset = read.start;
let mut metas = Vec::with_capacity(blobs_at.len());
// Blobs in `read` only provide their starting offset. The end offset
// of a blob is implicit: the start of the next blob if one exists
// or the end of the read.
for (blob_start, meta) in blobs_at.iter().copied() {
let header_start = (blob_start - read.start) as usize;
let header = Header::decode(&buf[header_start..])?;
let data_start = header_start + header.header_len;
let end = data_start + header.data_len;
let compression_bits = header.compression_bits;
for (blob_start, meta) in blobs_at {
let blob_start_in_buf = blob_start - start_offset;
let first_len_byte = buf[blob_start_in_buf as usize];
blobs.push(VectoredBlob {
header_start,
data_start,
// Each blob is prefixed by a header containing its size and compression information.
// Extract the size and skip that header to find the start of the data.
// The size can be 1 or 4 bytes. The most significant bit is 0 in the
// 1 byte case and 1 in the 4 byte case.
let (size_length, blob_size, compression_bits) = if first_len_byte < 0x80 {
(1, first_len_byte as u64, BYTE_UNCOMPRESSED)
} else {
let mut blob_size_buf = [0u8; 4];
let offset_in_buf = blob_start_in_buf as usize;
blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
blob_size_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
(
4,
u32::from_be_bytes(blob_size_buf) as u64,
compression_bits,
)
};
let start = (blob_start_in_buf + size_length) as usize;
let end = start + blob_size as usize;
metas.push(VectoredBlob {
start,
end,
meta,
meta: *meta,
compression_bits,
});
}
Ok(VectoredBlobsBuf { buf, blobs })
Ok(VectoredBlobsBuf { buf, blobs: metas })
}
}
@@ -982,15 +997,6 @@ mod tests {
&read_buf[..],
"mismatch for idx={idx} at offset={offset}"
);
// Check that raw_with_header returns a valid header.
let raw = read_blob.raw_with_header(&view);
let header = Header::decode(&raw)?;
if !compression || header.header_len == 1 {
assert_eq!(header.compression_bits, BYTE_UNCOMPRESSED);
}
assert_eq!(raw.len(), header.total_len());
buf = result.buf;
}
Ok(())

View File

@@ -1366,8 +1366,7 @@ pub(crate) type IoBuffer = AlignedBuffer<ConstAlign<{ get_io_buffer_alignment()
pub(crate) type IoPageSlice<'a> =
AlignedSlice<'a, PAGE_SZ, ConstAlign<{ get_io_buffer_alignment() }>>;
static IO_MODE: once_cell::sync::Lazy<AtomicU8> =
once_cell::sync::Lazy::new(|| AtomicU8::new(IoMode::preferred() as u8));
static IO_MODE: AtomicU8 = AtomicU8::new(IoMode::preferred() as u8);
pub(crate) fn set_io_mode(mode: IoMode) {
IO_MODE.store(mode as u8, std::sync::atomic::Ordering::Relaxed);

View File

@@ -95,7 +95,7 @@ static uint32 local_request_counter;
* Various settings related to prompt (fast) handling of PageStream responses
* at any CHECK_FOR_INTERRUPTS point.
*/
int readahead_getpage_pull_timeout_ms = 50;
int readahead_getpage_pull_timeout_ms = 0;
static int PS_TIMEOUT_ID = 0;
static bool timeout_set = false;
static bool timeout_signaled = false;

View File

@@ -75,7 +75,7 @@ char *neon_auth_token;
int readahead_buffer_size = 128;
int flush_every_n_requests = 8;
int neon_protocol_version = 3;
int neon_protocol_version = 2;
static int neon_compute_mode = 0;
static int max_reconnect_attempts = 60;
@@ -1362,7 +1362,7 @@ pg_init_libpagestore(void)
"",
PGC_POSTMASTER,
0, /* no flags required */
NULL, NULL, NULL);
check_neon_id, NULL, NULL);
DefineCustomStringVariable("neon.branch_id",
"Neon branch_id the server is running on",
NULL,
@@ -1370,7 +1370,7 @@ pg_init_libpagestore(void)
"",
PGC_POSTMASTER,
0, /* no flags required */
NULL, NULL, NULL);
check_neon_id, NULL, NULL);
DefineCustomStringVariable("neon.endpoint_id",
"Neon endpoint_id the server is running on",
NULL,
@@ -1378,7 +1378,7 @@ pg_init_libpagestore(void)
"",
PGC_POSTMASTER,
0, /* no flags required */
NULL, NULL, NULL);
check_neon_id, NULL, NULL);
DefineCustomIntVariable("neon.stripe_size",
"sharding stripe size",
@@ -1432,7 +1432,7 @@ pg_init_libpagestore(void)
"PageStream connection when we have pages which "
"were read ahead but not yet received.",
&readahead_getpage_pull_timeout_ms,
50, 0, 5 * 60 * 1000,
0, 0, 5 * 60 * 1000,
PGC_USERSET,
GUC_UNIT_MS,
NULL, NULL, NULL);
@@ -1440,7 +1440,7 @@ pg_init_libpagestore(void)
"Version of compute<->page server protocol",
NULL,
&neon_protocol_version,
3, /* use protocol version 3 */
2, /* use protocol version 2 */
2, /* min */
3, /* max */
PGC_SU_BACKEND,

View File

@@ -803,13 +803,7 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
case RELPERSISTENCE_TEMP:
case RELPERSISTENCE_UNLOGGED:
#ifdef DEBUG_COMPARE_LOCAL
mdcreate(reln, forkNum, forkNum == INIT_FORKNUM || isRedo);
if (forkNum == MAIN_FORKNUM)
mdcreate(reln, INIT_FORKNUM, true);
#else
mdcreate(reln, forkNum, isRedo);
#endif
return;
default:
@@ -1979,10 +1973,6 @@ neon_start_unlogged_build(SMgrRelation reln)
case RELPERSISTENCE_UNLOGGED:
unlogged_build_rel = reln;
unlogged_build_phase = UNLOGGED_BUILD_NOT_PERMANENT;
#ifdef DEBUG_COMPARE_LOCAL
if (!IsParallelWorker())
mdcreate(reln, INIT_FORKNUM, true);
#endif
return;
default:
@@ -2005,14 +1995,12 @@ neon_start_unlogged_build(SMgrRelation reln)
* FIXME: should we pass isRedo true to create the tablespace dir if it
* doesn't exist? Is it needed?
*/
if (!IsParallelWorker())
{
#ifndef DEBUG_COMPARE_LOCAL
if (!IsParallelWorker())
mdcreate(reln, MAIN_FORKNUM, false);
#else
mdcreate(reln, INIT_FORKNUM, true);
mdcreate(reln, INIT_FORKNUM, false);
#endif
}
}
/*
@@ -2052,7 +2040,7 @@ neon_finish_unlogged_build_phase_1(SMgrRelation reln)
/*
* neon_end_unlogged_build() -- Finish an unlogged rel build.
*
* Call this after you have finished WAL-logging a relation that was
* Call this after you have finished WAL-logging an relation that was
* first populated without WAL-logging.
*
* This removes the local copy of the rel, since it's now been fully
@@ -2071,35 +2059,14 @@ neon_end_unlogged_build(SMgrRelation reln)
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_PERMANENT)
{
XLogRecPtr recptr;
BlockNumber nblocks;
Assert(unlogged_build_phase == UNLOGGED_BUILD_PHASE_2);
Assert(reln->smgr_relpersistence == RELPERSISTENCE_UNLOGGED);
/*
* Update the last-written LSN cache.
*
* The relation is still on local disk so we can get the size by
* calling mdnblocks() directly. For the LSN, GetXLogInsertRecPtr() is
* very conservative. If we could assume that this function is called
* from the same backend that WAL-logged the contents, we could use
* XactLastRecEnd here. But better safe than sorry.
*/
nblocks = mdnblocks(reln, MAIN_FORKNUM);
recptr = GetXLogInsertRecPtr();
neon_set_lwlsn_block_range(recptr,
InfoFromNInfoB(rinfob),
MAIN_FORKNUM, 0, nblocks);
neon_set_lwlsn_relation(recptr,
InfoFromNInfoB(rinfob),
MAIN_FORKNUM);
/* Make the relation look permanent again */
reln->smgr_relpersistence = RELPERSISTENCE_PERMANENT;
/* Remove local copy */
rinfob = InfoBFromSMgrRel(reln);
for (int forknum = 0; forknum <= MAX_FORKNUM; forknum++)
{
neon_log(SmgrTrace, "forgetting cached relsize for %u/%u/%u.%u",
@@ -2111,12 +2078,12 @@ neon_end_unlogged_build(SMgrRelation reln)
#ifndef DEBUG_COMPARE_LOCAL
/* use isRedo == true, so that we drop it immediately */
mdunlink(rinfob, forknum, true);
#else
mdunlink(rinfob, INIT_FORKNUM, true);
#endif
}
#ifdef DEBUG_COMPARE_LOCAL
mdunlink(rinfob, INIT_FORKNUM, true);
#endif
}
unlogged_build_rel = NULL;
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
}

View File

@@ -890,7 +890,7 @@ libpqwp_connect_start(char *conninfo)
* palloc will exit on failure though, so there's not much we could do if
* it *did* fail.
*/
conn = (WalProposerConn*)MemoryContextAllocZero(TopMemoryContext, sizeof(WalProposerConn));
conn = palloc(sizeof(WalProposerConn));
conn->pg_conn = pg_conn;
conn->is_nonblocking = false; /* connections always start in blocking
* mode */

View File

@@ -776,6 +776,7 @@ impl From<&jose_jwk::Key> for KeyType {
}
#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use std::future::IntoFuture;
use std::net::SocketAddr;

View File

@@ -253,6 +253,7 @@ fn project_name_valid(name: &str) -> bool {
}
#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use ComputeUserInfoParseError::*;
use serde_json::json;

View File

@@ -258,7 +258,7 @@ async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
"unexpected startup packet, rejecting connection"
);
stream
.throw_error_str(ERR_INSECURE_CONNECTION, crate::error::ErrorKind::User, None)
.throw_error_str(ERR_INSECURE_CONNECTION, crate::error::ErrorKind::User)
.await?
}
}

View File

@@ -259,6 +259,7 @@ impl EndpointsCache {
}
#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use super::*;

View File

@@ -585,6 +585,7 @@ impl Cache for ProjectInfoCacheImpl {
}
#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::scram::ServerSecret;

View File

@@ -222,7 +222,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
{
Ok(auth_result) => auth_result,
Err(e) => {
return stream.throw_error(e, Some(ctx)).await?;
return stream.throw_error(e).await?;
}
};
@@ -238,7 +238,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
config.wake_compute_retry_config,
&config.connect_to_compute,
)
.or_else(|e| stream.throw_error(e, Some(ctx)))
.or_else(|e| stream.throw_error(e))
.await?;
let cancellation_handler_clone = Arc::clone(&cancellation_handler);

View File

@@ -63,7 +63,7 @@ struct RequestContextInner {
success: bool,
pub(crate) cold_start_info: ColdStartInfo,
pg_options: Option<StartupMessageParams>,
testodrome_query_id: Option<SmolStr>,
testodrome_query_id: Option<String>,
// extra
// This sender is here to keep the request monitoring channel open while requests are taking place.
@@ -219,7 +219,7 @@ impl RequestContext {
for option in options_str.split_whitespace() {
if option.starts_with("neon_query_id:") {
if let Some(value) = option.strip_prefix("neon_query_id:") {
this.set_testodrome_id(value.into());
this.set_testodrome_id(value.to_string());
break;
}
}
@@ -272,7 +272,7 @@ impl RequestContext {
.set_user_agent(user_agent);
}
pub(crate) fn set_testodrome_id(&self, query_id: SmolStr) {
pub(crate) fn set_testodrome_id(&self, query_id: String) {
self.0
.try_lock()
.expect("should not deadlock")
@@ -378,7 +378,7 @@ impl RequestContext {
.accumulated()
}
pub(crate) fn get_testodrome_id(&self) -> Option<SmolStr> {
pub(crate) fn get_testodrome_id(&self) -> Option<String> {
self.0
.try_lock()
.expect("should not deadlock")
@@ -447,7 +447,7 @@ impl RequestContextInner {
self.user = Some(user);
}
fn set_testodrome_id(&mut self, query_id: SmolStr) {
fn set_testodrome_id(&mut self, query_id: String) {
self.testodrome_query_id = Some(query_id);
}

View File

@@ -416,6 +416,7 @@ async fn upload_parquet(
}
#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use std::net::Ipv4Addr;
use std::num::NonZeroUsize;

View File

@@ -227,6 +227,7 @@ impl From<AccountId> for AccountIdInt {
}
#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use std::sync::OnceLock;

View File

@@ -91,7 +91,6 @@ mod jemalloc;
mod logging;
mod metrics;
mod parse;
mod pglb;
mod protocol2;
mod proxy;
mod rate_limiter;

View File

@@ -1032,6 +1032,7 @@ impl<const F: usize> serde::ser::Serialize for ExtractedSpanFields<'_, F> {
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use std::marker::PhantomData;
use std::sync::{Arc, Mutex, MutexGuard};

View File

@@ -1,193 +0,0 @@
#![allow(dead_code, reason = "TODO: work in progress")]
use std::pin::{Pin, pin};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use std::{fmt, io};
use tokio::io::{AsyncRead, AsyncWrite, DuplexStream, ReadBuf};
use tokio::sync::mpsc;
const STREAM_CHANNEL_SIZE: usize = 16;
const MAX_STREAM_BUFFER_SIZE: usize = 4096;
#[derive(Debug)]
pub struct Connection {
stream_sender: mpsc::Sender<Stream>,
stream_receiver: mpsc::Receiver<Stream>,
stream_id_counter: Arc<AtomicUsize>,
}
impl Connection {
pub fn new() -> (Connection, Connection) {
let (sender_a, receiver_a) = mpsc::channel(STREAM_CHANNEL_SIZE);
let (sender_b, receiver_b) = mpsc::channel(STREAM_CHANNEL_SIZE);
let stream_id_counter = Arc::new(AtomicUsize::new(1));
let conn_a = Connection {
stream_sender: sender_a,
stream_receiver: receiver_b,
stream_id_counter: Arc::clone(&stream_id_counter),
};
let conn_b = Connection {
stream_sender: sender_b,
stream_receiver: receiver_a,
stream_id_counter,
};
(conn_a, conn_b)
}
#[inline]
fn next_stream_id(&self) -> StreamId {
StreamId(self.stream_id_counter.fetch_add(1, Ordering::Relaxed))
}
#[tracing::instrument(skip_all, fields(stream_id = tracing::field::Empty, err))]
pub async fn open_stream(&self) -> io::Result<Stream> {
let (local, remote) = tokio::io::duplex(MAX_STREAM_BUFFER_SIZE);
let stream_id = self.next_stream_id();
tracing::Span::current().record("stream_id", stream_id.0);
let local = Stream {
inner: local,
id: stream_id,
};
let remote = Stream {
inner: remote,
id: stream_id,
};
self.stream_sender
.send(remote)
.await
.map_err(io::Error::other)?;
Ok(local)
}
#[tracing::instrument(skip_all, fields(stream_id = tracing::field::Empty, err))]
pub async fn accept_stream(&mut self) -> io::Result<Option<Stream>> {
Ok(self.stream_receiver.recv().await.inspect(|stream| {
tracing::Span::current().record("stream_id", stream.id.0);
}))
}
}
#[derive(Copy, Clone, Debug)]
pub struct StreamId(usize);
impl fmt::Display for StreamId {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
// TODO: Proper closing. Currently Streams can outlive their Connections.
// Carry WeakSender and check strong_count?
#[derive(Debug)]
pub struct Stream {
inner: DuplexStream,
id: StreamId,
}
impl Stream {
#[inline]
pub fn id(&self) -> StreamId {
self.id
}
}
impl AsyncRead for Stream {
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
#[inline]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
pin!(&mut self.inner).poll_read(cx, buf)
}
}
impl AsyncWrite for Stream {
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
#[inline]
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
pin!(&mut self.inner).poll_write(cx, buf)
}
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
#[inline]
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
pin!(&mut self.inner).poll_flush(cx)
}
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
#[inline]
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
pin!(&mut self.inner).poll_shutdown(cx)
}
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
#[inline]
fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<Result<usize, io::Error>> {
pin!(&mut self.inner).poll_write_vectored(cx, bufs)
}
#[inline]
fn is_write_vectored(&self) -> bool {
self.inner.is_write_vectored()
}
}
#[cfg(test)]
mod tests {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::*;
#[tokio::test]
async fn test_simple_roundtrip() {
let (client, mut server) = Connection::new();
let server_task = tokio::spawn(async move {
while let Some(mut stream) = server.accept_stream().await.unwrap() {
tokio::spawn(async move {
let mut buf = [0; 64];
loop {
match stream.read(&mut buf).await.unwrap() {
0 => break,
n => stream.write(&buf[..n]).await.unwrap(),
};
}
});
}
});
let mut stream = client.open_stream().await.unwrap();
stream.write_all(b"hello!").await.unwrap();
let mut buf = [0; 64];
let n = stream.read(&mut buf).await.unwrap();
assert_eq!(n, 6);
assert_eq!(&buf[..n], b"hello!");
drop(stream);
drop(client);
server_task.await.unwrap();
}
}

View File

@@ -1 +0,0 @@
pub mod inprocess;

View File

@@ -400,6 +400,7 @@ impl NetworkEndianIpv6 {
}
#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use tokio::io::AsyncReadExt;

View File

@@ -262,6 +262,7 @@ impl CopyBuffer {
}
#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use tokio::io::AsyncWriteExt;

Some files were not shown because too many files have changed in this diff Show More