mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-31 03:50:37 +00:00
Compare commits
1 Commits
bodobolero
...
tristan957
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3dd61ede6f |
1
.github/actionlint.yml
vendored
1
.github/actionlint.yml
vendored
@@ -20,4 +20,3 @@ config-variables:
|
||||
- REMOTE_STORAGE_AZURE_REGION
|
||||
- SLACK_UPCOMING_RELEASE_CHANNEL_ID
|
||||
- DEV_AWS_OIDC_ROLE_ARN
|
||||
- BENCHMARK_INGEST_TARGET_PROJECTID
|
||||
|
||||
116
.github/dependabot.yml
vendored
Normal file
116
.github/dependabot.yml
vendored
Normal file
@@ -0,0 +1,116 @@
|
||||
version: 2
|
||||
|
||||
updates:
|
||||
- directory: /
|
||||
package-ecosystem: cargo
|
||||
schedule:
|
||||
interval: daily
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: /
|
||||
package-ecosystem: docker
|
||||
schedule:
|
||||
interval: daily
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: /
|
||||
package-ecosystem: github-actions
|
||||
schedule:
|
||||
interval: daily
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: /
|
||||
package-ecosystem: pip
|
||||
schedule:
|
||||
interval: daily
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/csharp/npgsql
|
||||
package-ecosystem: docker
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/csharp/npgsql
|
||||
package-ecosystem: nuget
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/java/jdbc/
|
||||
package-ecosystem: docker
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/python/asyncpg/
|
||||
package-ecosystem: pip
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/python/pg8000/
|
||||
package-ecosystem: pip
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/rust/tokio-postgres/
|
||||
package-ecosystem: cargo
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/rust/tokio-postgres/
|
||||
package-ecosystem: docker
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/swift/PostgresNIOExample/
|
||||
package-ecosystem: docker
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/swift/PostgresNIOExample/
|
||||
package-ecosystem: swift
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/swift/PostgresClientKitExample/
|
||||
package-ecosystem: docker
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/swift/PostgresClientKitExample/
|
||||
package-ecosystem: swift
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/typescript/postgresql-client/
|
||||
package-ecosystem: docker
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/typescript/postgresql-client/
|
||||
package-ecosystem: npm
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/typescript/serverless-driver/
|
||||
package-ecosystem: docker
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/typescript/serverless-driver/
|
||||
package-ecosystem: npm
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
367
.github/workflows/benchmarking.yml
vendored
367
.github/workflows/benchmarking.yml
vendored
@@ -37,9 +37,9 @@ on:
|
||||
description: 'AWS-RDS and AWS-AURORA normally only run on Saturday. Set this to true to run them on every workflow_dispatch'
|
||||
required: false
|
||||
default: false
|
||||
run_only_ingest_tests:
|
||||
run_only_pgvector_tests:
|
||||
type: boolean
|
||||
description: 'Run ingest tests but no other tests. If not set, all tests including ingest tests will be run'
|
||||
description: 'Run pgvector tests but no other tests. If not set, all tests including pgvector tests will be run'
|
||||
required: false
|
||||
default: false
|
||||
|
||||
@@ -53,355 +53,8 @@ concurrency:
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
ingest:
|
||||
strategy:
|
||||
matrix:
|
||||
target_project: [new_empty_project, large_existing_project]
|
||||
permissions:
|
||||
contents: write
|
||||
statuses: write
|
||||
id-token: write # aws-actions/configure-aws-credentials
|
||||
env:
|
||||
PG_CONFIG: /tmp/neon/pg_install/v16/bin/pg_config
|
||||
PSQL: /tmp/neon/pg_install/v16/bin/psql
|
||||
PG_16_LIB_PATH: /tmp/neon/pg_install/v16/lib
|
||||
PGCOPYDB: /pgcopydb/bin/pgcopydb
|
||||
PGCOPYDB_LIB_PATH: /pgcopydb/lib
|
||||
runs-on: [ self-hosted, us-east-2, x64 ]
|
||||
container:
|
||||
image: neondatabase/build-tools:pinned-bookworm
|
||||
credentials:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
options: --init
|
||||
timeout-minutes: 1440
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Configure AWS credentials # necessary on Azure runners
|
||||
uses: aws-actions/configure-aws-credentials@v4
|
||||
with:
|
||||
aws-region: eu-central-1
|
||||
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
|
||||
role-duration-seconds: 18000 # 5 hours is currently max associated with IAM role
|
||||
|
||||
- name: Download Neon artifact
|
||||
uses: ./.github/actions/download
|
||||
with:
|
||||
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
|
||||
path: /tmp/neon/
|
||||
prefix: latest
|
||||
|
||||
- name: Create Neon Project
|
||||
if: ${{ matrix.target_project == 'new_empty_project' }}
|
||||
id: create-neon-project-ingest-target
|
||||
uses: ./.github/actions/neon-project-create
|
||||
with:
|
||||
region_id: aws-us-east-2
|
||||
postgres_version: 16
|
||||
compute_units: '[7, 7]' # we want to test large compute here to avoid compute-side bottleneck
|
||||
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||
|
||||
- name: Initialize Neon project and retrieve current backpressure seconds
|
||||
if: ${{ matrix.target_project == 'new_empty_project' }}
|
||||
env:
|
||||
NEW_PROJECT_CONNSTR: ${{ steps.create-neon-project-ingest-target.outputs.dsn }}
|
||||
NEW_PROJECT_ID: ${{ steps.create-neon-project-ingest-target.outputs.project_id }}
|
||||
run: |
|
||||
echo "Initializing Neon project with project_id: ${NEW_PROJECT_ID}"
|
||||
export LD_LIBRARY_PATH=${PG_16_LIB_PATH}
|
||||
${PSQL} "${NEW_PROJECT_CONNSTR}" -c "CREATE EXTENSION IF NOT EXISTS neon; CREATE EXTENSION IF NOT EXISTS neon_utils;"
|
||||
BACKPRESSURE_TIME_BEFORE_INGEST=$(${PSQL} "${NEW_PROJECT_CONNSTR}" -t -c "select backpressure_throttling_time()/1000000;")
|
||||
echo "BACKPRESSURE_TIME_BEFORE_INGEST=${BACKPRESSURE_TIME_BEFORE_INGEST}" >> $GITHUB_ENV
|
||||
echo "NEW_PROJECT_CONNSTR=${NEW_PROJECT_CONNSTR}" >> $GITHUB_ENV
|
||||
|
||||
- name: Create Neon Branch for large tenant
|
||||
if: ${{ matrix.target_project == 'large_existing_project' }}
|
||||
id: create-neon-branch-ingest-target
|
||||
uses: ./.github/actions/neon-branch-create
|
||||
with:
|
||||
project_id: ${{ vars.BENCHMARK_INGEST_TARGET_PROJECTID }}
|
||||
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||
|
||||
- name: Initialize Neon project and retrieve current backpressure seconds
|
||||
if: ${{ matrix.target_project == 'large_existing_project' }}
|
||||
env:
|
||||
NEW_PROJECT_CONNSTR: ${{ steps.create-neon-branch-ingest-target.outputs.dsn }}
|
||||
NEW_BRANCH_ID: ${{ steps.create-neon-branch-ingest-target.outputs.branch_id }}
|
||||
run: |
|
||||
echo "Initializing Neon branch with branch_id: ${NEW_BRANCH_ID}"
|
||||
export LD_LIBRARY_PATH=${PG_16_LIB_PATH}
|
||||
# Extract the part before the database name
|
||||
base_connstr="${NEW_PROJECT_CONNSTR%/*}"
|
||||
# Extract the query parameters (if any) after the database name
|
||||
query_params="${NEW_PROJECT_CONNSTR#*\?}"
|
||||
# Reconstruct the new connection string
|
||||
if [ "$query_params" != "$NEW_PROJECT_CONNSTR" ]; then
|
||||
new_connstr="${base_connstr}/neondb?${query_params}"
|
||||
else
|
||||
new_connstr="${base_connstr}/neondb"
|
||||
fi
|
||||
${PSQL} "${new_connstr}" -c "drop database ludicrous;"
|
||||
${PSQL} "${new_connstr}" -c "CREATE DATABASE ludicrous;"
|
||||
if [ "$query_params" != "$NEW_PROJECT_CONNSTR" ]; then
|
||||
NEW_PROJECT_CONNSTR="${base_connstr}/ludicrous?${query_params}"
|
||||
else
|
||||
NEW_PROJECT_CONNSTR="${base_connstr}/ludicrous"
|
||||
fi
|
||||
${PSQL} "${NEW_PROJECT_CONNSTR}" -c "CREATE EXTENSION IF NOT EXISTS neon; CREATE EXTENSION IF NOT EXISTS neon_utils;"
|
||||
BACKPRESSURE_TIME_BEFORE_INGEST=$(${PSQL} "${NEW_PROJECT_CONNSTR}" -t -c "select backpressure_throttling_time()/1000000;")
|
||||
echo "BACKPRESSURE_TIME_BEFORE_INGEST=${BACKPRESSURE_TIME_BEFORE_INGEST}" >> $GITHUB_ENV
|
||||
echo "NEW_PROJECT_CONNSTR=${NEW_PROJECT_CONNSTR}" >> $GITHUB_ENV
|
||||
|
||||
|
||||
- name: Create pgcopydb filter file
|
||||
run: |
|
||||
cat << EOF > /tmp/pgcopydb_filter.txt
|
||||
[include-only-table]
|
||||
public.events
|
||||
public.emails
|
||||
public.email_transmissions
|
||||
public.payments
|
||||
public.editions
|
||||
public.edition_modules
|
||||
public.sp_content
|
||||
public.email_broadcasts
|
||||
public.user_collections
|
||||
public.devices
|
||||
public.user_accounts
|
||||
public.lessons
|
||||
public.lesson_users
|
||||
public.payment_methods
|
||||
public.orders
|
||||
public.course_emails
|
||||
public.modules
|
||||
public.users
|
||||
public.module_users
|
||||
public.courses
|
||||
public.payment_gateway_keys
|
||||
public.accounts
|
||||
public.roles
|
||||
public.payment_gateways
|
||||
public.management
|
||||
public.event_names
|
||||
EOF
|
||||
|
||||
- name: Invoke pgcopydb
|
||||
env:
|
||||
BENCHMARK_INGEST_SOURCE_CONNSTR: ${{ secrets.BENCHMARK_INGEST_SOURCE_CONNSTR }}
|
||||
run: |
|
||||
export LD_LIBRARY_PATH=${PGCOPYDB_LIB_PATH}:${PG_16_LIB_PATH}
|
||||
export PGCOPYDB_SOURCE_PGURI="${BENCHMARK_INGEST_SOURCE_CONNSTR}"
|
||||
export PGCOPYDB_TARGET_PGURI="${NEW_PROJECT_CONNSTR}"
|
||||
export PGOPTIONS="-c maintenance_work_mem=8388608 -c max_parallel_maintenance_workers=7"
|
||||
${PG_CONFIG} --bindir
|
||||
${PGCOPYDB} --version
|
||||
${PGCOPYDB} clone --skip-vacuum --no-owner --no-acl --skip-db-properties --table-jobs 4 \
|
||||
--index-jobs 4 --restore-jobs 4 --split-tables-larger-than 10GB --skip-extensions \
|
||||
--use-copy-binary --filters /tmp/pgcopydb_filter.txt 2>&1 | tee /tmp/pgcopydb_${{ matrix.target_project }}.log
|
||||
|
||||
# create dummy pgcopydb log to test parsing
|
||||
# - name: create dummy log for parser test
|
||||
# run: |
|
||||
# cat << EOF > /tmp/pgcopydb_${{ matrix.target_project }}.log
|
||||
# 2024-11-04 18:00:53.433 500861 INFO main.c:136 Running pgcopydb version 0.17.10.g8361a93 from "/usr/lib/postgresql/17/bin/pgcopydb"
|
||||
# 2024-11-04 18:00:53.434 500861 INFO cli_common.c:1225 [SOURCE] Copying database from "postgres://neondb_owner@ep-bitter-shape-w2c1ir0a.us-east-2.aws.neon.build/neondb?sslmode=require&keepalives=1&keepalives_idle=10&keepalives_interval=10&keepalives_count=60"
|
||||
# 2024-11-04 18:00:53.434 500861 INFO cli_common.c:1226 [TARGET] Copying database into "postgres://neondb_owner@ep-icy-union-w25qd5pj.us-east-2.aws.neon.build/ludicrous?sslmode=require&keepalives=1&keepalives_idle=10&keepalives_interval=10&keepalives_count=60"
|
||||
# 2024-11-04 18:00:53.442 500861 INFO copydb.c:105 Using work dir "/tmp/pgcopydb"
|
||||
# 2024-11-04 18:00:53.541 500861 INFO snapshot.c:107 Exported snapshot "00000008-00000033-1" from the source database
|
||||
# 2024-11-04 18:00:53.556 500865 INFO cli_clone_follow.c:543 STEP 1: fetch source database tables, indexes, and sequences
|
||||
# 2024-11-04 18:00:54.570 500865 INFO copydb_schema.c:716 Splitting source candidate tables larger than 10 GB
|
||||
# 2024-11-04 18:00:54.570 500865 INFO copydb_schema.c:829 Table public.events is 96 GB large which is larger than --split-tables-larger-than 10 GB, and does not have a unique column of type integer: splitting by CTID
|
||||
# 2024-11-04 18:01:05.538 500865 INFO copydb_schema.c:905 Table public.events is 96 GB large, 10 COPY processes will be used, partitioning on ctid.
|
||||
# 2024-11-04 18:01:05.564 500865 INFO copydb_schema.c:905 Table public.email_transmissions is 27 GB large, 4 COPY processes will be used, partitioning on id.
|
||||
# 2024-11-04 18:01:05.584 500865 INFO copydb_schema.c:905 Table public.lessons is 25 GB large, 4 COPY processes will be used, partitioning on id.
|
||||
# 2024-11-04 18:01:05.605 500865 INFO copydb_schema.c:905 Table public.lesson_users is 16 GB large, 3 COPY processes will be used, partitioning on id.
|
||||
# 2024-11-04 18:01:05.605 500865 INFO copydb_schema.c:761 Fetched information for 26 tables (including 4 tables split in 21 partitions total), with an estimated total of 907 million tuples and 175 GB on-disk
|
||||
# 2024-11-04 18:01:05.687 500865 INFO copydb_schema.c:968 Fetched information for 57 indexes (supporting 25 constraints)
|
||||
# 2024-11-04 18:01:05.753 500865 INFO sequences.c:78 Fetching information for 24 sequences
|
||||
# 2024-11-04 18:01:05.903 500865 INFO copydb_schema.c:1122 Fetched information for 4 extensions
|
||||
# 2024-11-04 18:01:06.178 500865 INFO copydb_schema.c:1538 Found 0 indexes (supporting 0 constraints) in the target database
|
||||
# 2024-11-04 18:01:06.184 500865 INFO cli_clone_follow.c:584 STEP 2: dump the source database schema (pre/post data)
|
||||
# 2024-11-04 18:01:06.186 500865 INFO pgcmd.c:468 /usr/lib/postgresql/16/bin/pg_dump -Fc --snapshot 00000008-00000033-1 --section=pre-data --section=post-data --file /tmp/pgcopydb/schema/schema.dump 'postgres://neondb_owner@ep-bitter-shape-w2c1ir0a.us-east-2.aws.neon.build/neondb?sslmode=require&keepalives=1&keepalives_idle=10&keepalives_interval=10&keepalives_count=60'
|
||||
# 2024-11-04 18:01:06.952 500865 INFO cli_clone_follow.c:592 STEP 3: restore the pre-data section to the target database
|
||||
# 2024-11-04 18:01:07.004 500865 INFO pgcmd.c:1001 /usr/lib/postgresql/16/bin/pg_restore --dbname 'postgres://neondb_owner@ep-icy-union-w25qd5pj.us-east-2.aws.neon.build/ludicrous?sslmode=require&keepalives=1&keepalives_idle=10&keepalives_interval=10&keepalives_count=60' --section pre-data --jobs 4 --no-owner --no-acl --use-list /tmp/pgcopydb/schema/pre-filtered.list /tmp/pgcopydb/schema/schema.dump
|
||||
# 2024-11-04 18:01:07.438 500874 INFO table-data.c:656 STEP 4: starting 4 table-data COPY processes
|
||||
# 2024-11-04 18:01:07.451 500877 INFO vacuum.c:139 STEP 8: skipping VACUUM jobs per --skip-vacuum
|
||||
# 2024-11-04 18:01:07.457 500875 INFO indexes.c:182 STEP 6: starting 4 CREATE INDEX processes
|
||||
# 2024-11-04 18:01:07.457 500875 INFO indexes.c:183 STEP 7: constraints are built by the CREATE INDEX processes
|
||||
# 2024-11-04 18:01:07.507 500865 INFO blobs.c:74 Skipping large objects: none found.
|
||||
# 2024-11-04 18:01:07.509 500865 INFO sequences.c:194 STEP 9: reset sequences values
|
||||
# 2024-11-04 18:01:07.510 500886 INFO sequences.c:290 Set sequences values on the target database
|
||||
# 2024-11-04 20:49:00.587 500865 INFO cli_clone_follow.c:608 STEP 10: restore the post-data section to the target database
|
||||
# 2024-11-04 20:49:00.600 500865 INFO pgcmd.c:1001 /usr/lib/postgresql/16/bin/pg_restore --dbname 'postgres://neondb_owner@ep-icy-union-w25qd5pj.us-east-2.aws.neon.build/ludicrous?sslmode=require&keepalives=1&keepalives_idle=10&keepalives_interval=10&keepalives_count=60' --section post-data --jobs 4 --no-owner --no-acl --use-list /tmp/pgcopydb/schema/post-filtered.list /tmp/pgcopydb/schema/schema.dump
|
||||
# 2024-11-05 10:50:58.508 500865 INFO cli_clone_follow.c:639 All step are now done, 16h49m elapsed
|
||||
# 2024-11-05 10:50:58.508 500865 INFO summary.c:3155 Printing summary for 26 tables and 57 indexes
|
||||
|
||||
# OID | Schema | Name | Parts | copy duration | transmitted bytes | indexes | create index duration
|
||||
# ------+--------+----------------------+-------+---------------+-------------------+---------+----------------------
|
||||
# 24654 | public | events | 10 | 1d11h | 878 GB | 1 | 1h41m
|
||||
# 24623 | public | email_transmissions | 4 | 4h46m | 99 GB | 3 | 2h04m
|
||||
# 24665 | public | lessons | 4 | 4h42m | 161 GB | 4 | 1m11s
|
||||
# 24661 | public | lesson_users | 3 | 2h46m | 49 GB | 3 | 39m35s
|
||||
# 24631 | public | emails | 1 | 34m07s | 10 GB | 2 | 17s
|
||||
# 24739 | public | payments | 1 | 5m47s | 1848 MB | 4 | 4m40s
|
||||
# 24681 | public | module_users | 1 | 4m57s | 1610 MB | 3 | 1m50s
|
||||
# 24694 | public | orders | 1 | 2m50s | 835 MB | 3 | 1m05s
|
||||
# 24597 | public | devices | 1 | 1m45s | 498 MB | 2 | 40s
|
||||
# 24723 | public | payment_methods | 1 | 1m24s | 548 MB | 2 | 31s
|
||||
# 24765 | public | user_collections | 1 | 2m17s | 1005 MB | 2 | 968ms
|
||||
# 24774 | public | users | 1 | 52s | 291 MB | 4 | 27s
|
||||
# 24760 | public | user_accounts | 1 | 16s | 172 MB | 3 | 16s
|
||||
# 24606 | public | edition_modules | 1 | 8s983 | 46 MB | 3 | 4s749
|
||||
# 24583 | public | course_emails | 1 | 8s526 | 26 MB | 2 | 996ms
|
||||
# 24685 | public | modules | 1 | 1s592 | 21 MB | 3 | 1s696
|
||||
# 24610 | public | editions | 1 | 2s199 | 7483 kB | 2 | 1s032
|
||||
# 24755 | public | sp_content | 1 | 1s555 | 4177 kB | 0 | 0ms
|
||||
# 24619 | public | email_broadcasts | 1 | 744ms | 2645 kB | 2 | 677ms
|
||||
# 24590 | public | courses | 1 | 387ms | 1540 kB | 2 | 367ms
|
||||
# 24704 | public | payment_gateway_keys | 1 | 1s972 | 164 kB | 2 | 27ms
|
||||
# 24576 | public | accounts | 1 | 58ms | 24 kB | 1 | 14ms
|
||||
# 24647 | public | event_names | 1 | 32ms | 397 B | 1 | 8ms
|
||||
# 24716 | public | payment_gateways | 1 | 1s675 | 117 B | 1 | 11ms
|
||||
# 24748 | public | roles | 1 | 71ms | 173 B | 1 | 8ms
|
||||
# 24676 | public | management | 1 | 33ms | 40 B | 1 | 19ms
|
||||
|
||||
|
||||
# Step Connection Duration Transfer Concurrency
|
||||
# -------------------------------------------------- ---------- ---------- ---------- ------------
|
||||
# Catalog Queries (table ordering, filtering, etc) source 12s 1
|
||||
# Dump Schema source 765ms 1
|
||||
# Prepare Schema target 466ms 1
|
||||
# COPY, INDEX, CONSTRAINTS, VACUUM (wall clock) both 2h47m 12
|
||||
# COPY (cumulative) both 7h46m 1225 GB 4
|
||||
# CREATE INDEX (cumulative) target 4h36m 4
|
||||
# CONSTRAINTS (cumulative) target 8s493 4
|
||||
# VACUUM (cumulative) target 0ms 4
|
||||
# Reset Sequences both 60ms 1
|
||||
# Large Objects (cumulative) (null) 0ms 0
|
||||
# Finalize Schema both 14h01m 4
|
||||
# -------------------------------------------------- ---------- ---------- ---------- ------------
|
||||
# Total Wall Clock Duration both 16h49m 20
|
||||
|
||||
|
||||
# EOF
|
||||
|
||||
|
||||
- name: show tables sizes and retrieve current backpressure seconds
|
||||
run: |
|
||||
export LD_LIBRARY_PATH=${PG_16_LIB_PATH}
|
||||
${PSQL} "${NEW_PROJECT_CONNSTR}" -c "\dt+"
|
||||
BACKPRESSURE_TIME_AFTER_INGEST=$(${PSQL} "${NEW_PROJECT_CONNSTR}" -t -c "select backpressure_throttling_time()/1000000;")
|
||||
echo "BACKPRESSURE_TIME_AFTER_INGEST=${BACKPRESSURE_TIME_AFTER_INGEST}" >> $GITHUB_ENV
|
||||
|
||||
- name: Parse pgcopydb log and report performance metrics
|
||||
env:
|
||||
PERF_TEST_RESULT_CONNSTR: ${{ secrets.PERF_TEST_RESULT_CONNSTR }}
|
||||
run: |
|
||||
export LD_LIBRARY_PATH=${PG_16_LIB_PATH}
|
||||
|
||||
# Define the log file path
|
||||
LOG_FILE="/tmp/pgcopydb_${{ matrix.target_project }}.log"
|
||||
|
||||
# Get the current git commit hash
|
||||
git config --global --add safe.directory /__w/neon/neon
|
||||
COMMIT_HASH=$(git rev-parse --short HEAD)
|
||||
|
||||
# Define the platform and test suite
|
||||
PLATFORM="pg16-${{ matrix.target_project }}-us-east-2-staging"
|
||||
SUIT="pgcopydb_ingest_bench"
|
||||
|
||||
# Function to convert time (e.g., "2h47m", "4h36m", "118ms", "8s493") to seconds
|
||||
convert_to_seconds() {
|
||||
local duration=$1
|
||||
local total_seconds=0
|
||||
|
||||
# Check for hours (h)
|
||||
if [[ "$duration" =~ ([0-9]+)h ]]; then
|
||||
total_seconds=$((total_seconds + ${BASH_REMATCH[1]#0} * 3600))
|
||||
fi
|
||||
|
||||
# Check for seconds (s)
|
||||
if [[ "$duration" =~ ([0-9]+)s ]]; then
|
||||
total_seconds=$((total_seconds + ${BASH_REMATCH[1]#0}))
|
||||
fi
|
||||
|
||||
# Check for milliseconds (ms) (if applicable)
|
||||
if [[ "$duration" =~ ([0-9]+)ms ]]; then
|
||||
total_seconds=$((total_seconds + ${BASH_REMATCH[1]#0} / 1000))
|
||||
duration=${duration/${BASH_REMATCH[0]}/} # need to remove it to avoid double counting with m
|
||||
fi
|
||||
|
||||
# Check for minutes (m) - must be checked after ms because m is contained in ms
|
||||
if [[ "$duration" =~ ([0-9]+)m ]]; then
|
||||
total_seconds=$((total_seconds + ${BASH_REMATCH[1]#0} * 60))
|
||||
fi
|
||||
|
||||
echo $total_seconds
|
||||
}
|
||||
|
||||
# Calculate the backpressure difference in seconds
|
||||
BACKPRESSURE_TIME_DIFF=$(awk "BEGIN {print $BACKPRESSURE_TIME_AFTER_INGEST - $BACKPRESSURE_TIME_BEFORE_INGEST}")
|
||||
|
||||
# Insert the backpressure time difference into the performance database
|
||||
if [ -n "$BACKPRESSURE_TIME_DIFF" ]; then
|
||||
PSQL_CMD="${PSQL} \"${PERF_TEST_RESULT_CONNSTR}\" -c \"
|
||||
INSERT INTO public.perf_test_results (suit, revision, platform, metric_name, metric_value, metric_unit, metric_report_type, recorded_at_timestamp)
|
||||
VALUES ('${SUIT}', '${COMMIT_HASH}', '${PLATFORM}', 'backpressure_time', ${BACKPRESSURE_TIME_DIFF}, 'seconds', 'lower_is_better', now());
|
||||
\""
|
||||
echo "Inserting backpressure time difference: ${BACKPRESSURE_TIME_DIFF} seconds"
|
||||
eval $PSQL_CMD
|
||||
fi
|
||||
|
||||
# Extract and process log lines
|
||||
while IFS= read -r line; do
|
||||
METRIC_NAME=""
|
||||
# Match each desired line and extract the relevant information
|
||||
if [[ "$line" =~ COPY,\ INDEX,\ CONSTRAINTS,\ VACUUM.* ]]; then
|
||||
METRIC_NAME="COPY, INDEX, CONSTRAINTS, VACUUM (wall clock)"
|
||||
elif [[ "$line" =~ COPY\ \(cumulative\).* ]]; then
|
||||
METRIC_NAME="COPY (cumulative)"
|
||||
elif [[ "$line" =~ CREATE\ INDEX\ \(cumulative\).* ]]; then
|
||||
METRIC_NAME="CREATE INDEX (cumulative)"
|
||||
elif [[ "$line" =~ CONSTRAINTS\ \(cumulative\).* ]]; then
|
||||
METRIC_NAME="CONSTRAINTS (cumulative)"
|
||||
elif [[ "$line" =~ Finalize\ Schema.* ]]; then
|
||||
METRIC_NAME="Finalize Schema"
|
||||
elif [[ "$line" =~ Total\ Wall\ Clock\ Duration.* ]]; then
|
||||
METRIC_NAME="Total Wall Clock Duration"
|
||||
fi
|
||||
|
||||
# If a metric was matched, insert it into the performance database
|
||||
if [ -n "$METRIC_NAME" ]; then
|
||||
DURATION=$(echo "$line" | grep -oP '\d+h\d+m|\d+s|\d+ms|\d{1,2}h\d{1,2}m|\d+\.\d+s' | head -n 1)
|
||||
METRIC_VALUE=$(convert_to_seconds "$DURATION")
|
||||
PSQL_CMD="${PSQL} \"${PERF_TEST_RESULT_CONNSTR}\" -c \"
|
||||
INSERT INTO public.perf_test_results (suit, revision, platform, metric_name, metric_value, metric_unit, metric_report_type, recorded_at_timestamp)
|
||||
VALUES ('${SUIT}', '${COMMIT_HASH}', '${PLATFORM}', '${METRIC_NAME}', ${METRIC_VALUE}, 'seconds', 'lower_is_better', now());
|
||||
\""
|
||||
echo "Inserting ${METRIC_NAME} with value ${METRIC_VALUE} seconds"
|
||||
eval $PSQL_CMD
|
||||
fi
|
||||
done < "$LOG_FILE"
|
||||
|
||||
- name: Delete Neon Project
|
||||
if: ${{ always() && matrix.target_project == 'new_empty_project' }}
|
||||
uses: ./.github/actions/neon-project-delete
|
||||
with:
|
||||
project_id: ${{ steps.create-neon-project-ingest-target.outputs.project_id }}
|
||||
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||
|
||||
- name: Delete Neon Branch for large tenant
|
||||
if: ${{ always() && matrix.target_project == 'large_existing_project' }}
|
||||
uses: ./.github/actions/neon-branch-delete
|
||||
with:
|
||||
project_id: ${{ vars.BENCHMARK_INGEST_TARGET_PROJECTID }}
|
||||
branch_id: ${{ steps.create-neon-branch-ingest-target.outputs.branch_id }}
|
||||
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||
|
||||
bench:
|
||||
if: ${{ github.event.inputs.run_only_ingest_tests == 'false' || github.event.inputs.run_only_ingest_tests == null }}
|
||||
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
|
||||
permissions:
|
||||
contents: write
|
||||
statuses: write
|
||||
@@ -510,7 +163,7 @@ jobs:
|
||||
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
|
||||
replication-tests:
|
||||
if: ${{ github.event.inputs.run_only_ingest_tests == 'false' || github.event.inputs.run_only_ingest_tests == null }}
|
||||
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
|
||||
permissions:
|
||||
contents: write
|
||||
statuses: write
|
||||
@@ -600,7 +253,7 @@ jobs:
|
||||
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
|
||||
generate-matrices:
|
||||
if: ${{ github.event.inputs.run_only_ingest_tests == 'false' || github.event.inputs.run_only_ingest_tests == null }}
|
||||
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
|
||||
# Create matrices for the benchmarking jobs, so we run benchmarks on rds only once a week (on Saturday)
|
||||
#
|
||||
# Available platforms:
|
||||
@@ -695,12 +348,11 @@ jobs:
|
||||
echo "matrix=$(echo "$matrix" | jq --compact-output '.')" >> $GITHUB_OUTPUT
|
||||
|
||||
prepare_AWS_RDS_databases:
|
||||
if: ${{ github.event.inputs.run_only_ingest_tests == 'false' || github.event.inputs.run_only_ingest_tests == null }}
|
||||
uses: ./.github/workflows/_benchmarking_preparation.yml
|
||||
secrets: inherit
|
||||
|
||||
pgbench-compare:
|
||||
if: ${{ github.event.inputs.run_only_ingest_tests == 'false' || github.event.inputs.run_only_ingest_tests == null }}
|
||||
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
|
||||
needs: [ generate-matrices, prepare_AWS_RDS_databases ]
|
||||
permissions:
|
||||
contents: write
|
||||
@@ -850,7 +502,6 @@ jobs:
|
||||
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
|
||||
pgbench-pgvector:
|
||||
if: ${{ github.event.inputs.run_only_ingest_tests == 'false' || github.event.inputs.run_only_ingest_tests == null }}
|
||||
permissions:
|
||||
contents: write
|
||||
statuses: write
|
||||
@@ -991,7 +642,7 @@ jobs:
|
||||
#
|
||||
# *_CLICKBENCH_CONNSTR: Genuine ClickBench DB with ~100M rows
|
||||
# *_CLICKBENCH_10M_CONNSTR: DB with the first 10M rows of ClickBench DB
|
||||
if: ${{ !cancelled() && (github.event.inputs.run_only_ingest_tests == 'false' || github.event.inputs.run_only_ingest_tests == null) }}
|
||||
if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }}
|
||||
permissions:
|
||||
contents: write
|
||||
statuses: write
|
||||
@@ -1103,7 +754,7 @@ jobs:
|
||||
# We might change it after https://github.com/neondatabase/neon/issues/2900.
|
||||
#
|
||||
# *_TPCH_S10_CONNSTR: DB generated with scale factor 10 (~10 GB)
|
||||
if: ${{ !cancelled() && (github.event.inputs.run_only_ingest_tests == 'false' || github.event.inputs.run_only_ingest_tests == null) }}
|
||||
if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }}
|
||||
permissions:
|
||||
contents: write
|
||||
statuses: write
|
||||
@@ -1209,7 +860,7 @@ jobs:
|
||||
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
|
||||
user-examples-compare:
|
||||
if: ${{ !cancelled() && (github.event.inputs.run_only_ingest_tests == 'false' || github.event.inputs.run_only_ingest_tests == null) }}
|
||||
if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }}
|
||||
permissions:
|
||||
contents: write
|
||||
statuses: write
|
||||
|
||||
152
Cargo.lock
generated
152
Cargo.lock
generated
@@ -310,6 +310,33 @@ dependencies = [
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aws-lc-rs"
|
||||
version = "1.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2f95446d919226d587817a7d21379e6eb099b97b45110a7f272a444ca5c54070"
|
||||
dependencies = [
|
||||
"aws-lc-sys",
|
||||
"mirai-annotations",
|
||||
"paste",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aws-lc-sys"
|
||||
version = "0.21.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b3ddc4a5b231dd6958b140ff3151b6412b3f4321fab354f399eec8f14b06df62"
|
||||
dependencies = [
|
||||
"bindgen 0.69.5",
|
||||
"cc",
|
||||
"cmake",
|
||||
"dunce",
|
||||
"fs_extra",
|
||||
"libc",
|
||||
"paste",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aws-runtime"
|
||||
version = "1.4.3"
|
||||
@@ -915,6 +942,29 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bindgen"
|
||||
version = "0.69.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088"
|
||||
dependencies = [
|
||||
"bitflags 2.4.1",
|
||||
"cexpr",
|
||||
"clang-sys",
|
||||
"itertools 0.10.5",
|
||||
"lazy_static",
|
||||
"lazycell",
|
||||
"log",
|
||||
"prettyplease",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"regex",
|
||||
"rustc-hash",
|
||||
"shlex",
|
||||
"syn 2.0.52",
|
||||
"which",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bindgen"
|
||||
version = "0.70.1"
|
||||
@@ -924,7 +974,7 @@ dependencies = [
|
||||
"bitflags 2.4.1",
|
||||
"cexpr",
|
||||
"clang-sys",
|
||||
"itertools 0.12.1",
|
||||
"itertools 0.10.5",
|
||||
"log",
|
||||
"prettyplease",
|
||||
"proc-macro2",
|
||||
@@ -1170,6 +1220,15 @@ version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b"
|
||||
|
||||
[[package]]
|
||||
name = "cmake"
|
||||
version = "0.1.51"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fb1e43aa7fd152b1f968787f7dbcdeb306d1867ff373c69955211876c053f91a"
|
||||
dependencies = [
|
||||
"cc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "colorchoice"
|
||||
version = "1.0.0"
|
||||
@@ -1270,9 +1329,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "const-oid"
|
||||
version = "0.9.6"
|
||||
version = "0.9.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8"
|
||||
checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f"
|
||||
|
||||
[[package]]
|
||||
name = "const-random"
|
||||
@@ -1756,6 +1815,12 @@ dependencies = [
|
||||
"syn 2.0.52",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dunce"
|
||||
version = "1.0.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813"
|
||||
|
||||
[[package]]
|
||||
name = "dyn-clone"
|
||||
version = "1.0.14"
|
||||
@@ -2060,6 +2125,12 @@ dependencies = [
|
||||
"tokio-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fs_extra"
|
||||
version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
|
||||
|
||||
[[package]]
|
||||
name = "fsevent-sys"
|
||||
version = "4.1.0"
|
||||
@@ -2413,6 +2484,15 @@ dependencies = [
|
||||
"digest",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "home"
|
||||
version = "0.5.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5"
|
||||
dependencies = [
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hostname"
|
||||
version = "0.4.0"
|
||||
@@ -2908,6 +2988,12 @@ dependencies = [
|
||||
"spin",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lazycell"
|
||||
version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.150"
|
||||
@@ -3138,6 +3224,12 @@ dependencies = [
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mirai-annotations"
|
||||
version = "1.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c9be0862c1b3f26a88803c4a49de6889c10e608b3ee9344e6ef5b45fb37ad3d1"
|
||||
|
||||
[[package]]
|
||||
name = "multimap"
|
||||
version = "0.8.3"
|
||||
@@ -4055,7 +4147,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"once_cell",
|
||||
"pq_proto",
|
||||
"rustls 0.23.16",
|
||||
"rustls 0.23.7",
|
||||
"rustls-pemfile 2.1.1",
|
||||
"serde",
|
||||
"thiserror",
|
||||
@@ -4084,7 +4176,7 @@ name = "postgres_ffi"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bindgen",
|
||||
"bindgen 0.70.1",
|
||||
"bytes",
|
||||
"crc32c",
|
||||
"env_logger",
|
||||
@@ -4222,7 +4314,7 @@ checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"heck 0.5.0",
|
||||
"itertools 0.12.1",
|
||||
"itertools 0.10.5",
|
||||
"log",
|
||||
"multimap",
|
||||
"once_cell",
|
||||
@@ -4242,7 +4334,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"itertools 0.12.1",
|
||||
"itertools 0.10.5",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.52",
|
||||
@@ -4330,7 +4422,7 @@ dependencies = [
|
||||
"rsa",
|
||||
"rstest",
|
||||
"rustc-hash",
|
||||
"rustls 0.23.16",
|
||||
"rustls 0.23.7",
|
||||
"rustls-native-certs 0.8.0",
|
||||
"rustls-pemfile 2.1.1",
|
||||
"scopeguard",
|
||||
@@ -4341,8 +4433,6 @@ dependencies = [
|
||||
"smallvec",
|
||||
"smol_str",
|
||||
"socket2",
|
||||
"strum",
|
||||
"strum_macros",
|
||||
"subtle",
|
||||
"thiserror",
|
||||
"tikv-jemalloc-ctl",
|
||||
@@ -5016,22 +5106,23 @@ dependencies = [
|
||||
"log",
|
||||
"ring",
|
||||
"rustls-pki-types",
|
||||
"rustls-webpki 0.102.8",
|
||||
"rustls-webpki 0.102.2",
|
||||
"subtle",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.23.16"
|
||||
version = "0.23.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eee87ff5d9b36712a58574e12e9f0ea80f915a5b0ac518d322b24a465617925e"
|
||||
checksum = "ebbbdb961df0ad3f2652da8f3fdc4b36122f568f968f45ad3316f26c025c677b"
|
||||
dependencies = [
|
||||
"aws-lc-rs",
|
||||
"log",
|
||||
"once_cell",
|
||||
"ring",
|
||||
"rustls-pki-types",
|
||||
"rustls-webpki 0.102.8",
|
||||
"rustls-webpki 0.102.2",
|
||||
"subtle",
|
||||
"zeroize",
|
||||
]
|
||||
@@ -5111,10 +5202,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rustls-webpki"
|
||||
version = "0.102.8"
|
||||
version = "0.102.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9"
|
||||
checksum = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610"
|
||||
dependencies = [
|
||||
"aws-lc-rs",
|
||||
"ring",
|
||||
"rustls-pki-types",
|
||||
"untrusted",
|
||||
@@ -5731,7 +5823,6 @@ dependencies = [
|
||||
"once_cell",
|
||||
"parking_lot 0.12.1",
|
||||
"prost",
|
||||
"rustls 0.23.16",
|
||||
"tokio",
|
||||
"tonic",
|
||||
"tonic-build",
|
||||
@@ -5814,7 +5905,7 @@ dependencies = [
|
||||
"postgres_ffi",
|
||||
"remote_storage",
|
||||
"reqwest 0.12.4",
|
||||
"rustls 0.23.16",
|
||||
"rustls 0.23.7",
|
||||
"rustls-native-certs 0.8.0",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -6247,7 +6338,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "04fb792ccd6bbcd4bba408eb8a292f70fc4a3589e5d793626f45190e6454b6ab"
|
||||
dependencies = [
|
||||
"ring",
|
||||
"rustls 0.23.16",
|
||||
"rustls 0.23.7",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-rustls 0.26.0",
|
||||
@@ -6281,7 +6372,7 @@ version = "0.26.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4"
|
||||
dependencies = [
|
||||
"rustls 0.23.16",
|
||||
"rustls 0.23.7",
|
||||
"rustls-pki-types",
|
||||
"tokio",
|
||||
]
|
||||
@@ -6690,7 +6781,7 @@ dependencies = [
|
||||
"base64 0.22.1",
|
||||
"log",
|
||||
"once_cell",
|
||||
"rustls 0.23.16",
|
||||
"rustls 0.23.7",
|
||||
"rustls-pki-types",
|
||||
"url",
|
||||
"webpki-roots 0.26.1",
|
||||
@@ -6894,7 +6985,7 @@ name = "walproposer"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bindgen",
|
||||
"bindgen 0.70.1",
|
||||
"postgres_ffi",
|
||||
"utils",
|
||||
]
|
||||
@@ -7069,6 +7160,18 @@ dependencies = [
|
||||
"rustls-pki-types",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "which"
|
||||
version = "4.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7"
|
||||
dependencies = [
|
||||
"either",
|
||||
"home",
|
||||
"once_cell",
|
||||
"rustix",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "whoami"
|
||||
version = "1.5.1"
|
||||
@@ -7328,7 +7431,7 @@ dependencies = [
|
||||
"hyper-util",
|
||||
"indexmap 1.9.3",
|
||||
"indexmap 2.0.1",
|
||||
"itertools 0.12.1",
|
||||
"itertools 0.10.5",
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"log",
|
||||
@@ -7349,7 +7452,8 @@ dependencies = [
|
||||
"regex-automata 0.4.3",
|
||||
"regex-syntax 0.8.2",
|
||||
"reqwest 0.12.4",
|
||||
"rustls 0.23.16",
|
||||
"rustls 0.23.7",
|
||||
"rustls-webpki 0.102.2",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
"serde_json",
|
||||
|
||||
@@ -143,7 +143,7 @@ reqwest-retry = "0.5"
|
||||
routerify = "3"
|
||||
rpds = "0.13"
|
||||
rustc-hash = "1.1.0"
|
||||
rustls = { version = "0.23.16", default-features = false }
|
||||
rustls = "0.23"
|
||||
rustls-pemfile = "2"
|
||||
scopeguard = "1.1"
|
||||
sysinfo = "0.29.2"
|
||||
@@ -174,7 +174,7 @@ tokio = { version = "1.17", features = ["macros"] }
|
||||
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
|
||||
tokio-io-timeout = "1.2.0"
|
||||
tokio-postgres-rustls = "0.12.0"
|
||||
tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]}
|
||||
tokio-rustls = "0.26"
|
||||
tokio-stream = "0.1"
|
||||
tokio-tar = "0.3"
|
||||
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
|
||||
|
||||
@@ -431,11 +431,14 @@ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
COPY compute/patches/rum.patch /rum.patch
|
||||
|
||||
# supports v17 since https://github.com/postgrespro/rum/commit/cb1edffc57736cd2a4455f8d0feab0d69928da25
|
||||
# doesn't use releases since 1.3.13 - Sep 19, 2022
|
||||
# use latest commit from the master branch
|
||||
RUN wget https://github.com/postgrespro/rum/archive/cb1edffc57736cd2a4455f8d0feab0d69928da25.tar.gz -O rum.tar.gz && \
|
||||
echo "65e0a752e99f4c3226400c9b899f997049e93503db8bf5c8072efa136d32fd83 rum.tar.gz" | sha256sum --check && \
|
||||
# maybe version-specific
|
||||
# support for v17 is unknown
|
||||
# last release 1.3.13 - Sep 19, 2022
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/postgrespro/rum/archive/refs/tags/1.3.13.tar.gz -O rum.tar.gz && \
|
||||
echo "6ab370532c965568df6210bd844ac6ba649f53055e48243525b0b7e5c4d69a7d rum.tar.gz" | sha256sum --check && \
|
||||
mkdir rum-src && cd rum-src && tar xzf ../rum.tar.gz --strip-components=1 -C . && \
|
||||
patch -p1 < /rum.patch && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
@@ -956,31 +959,21 @@ RUN apt-get install -y protobuf-compiler && \
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM rust-extensions-build-pgrx12 AS pg-jsonschema-pg-build
|
||||
FROM rust-extensions-build AS pg-jsonschema-pg-build
|
||||
ARG PG_VERSION
|
||||
# version 0.3.3 supports v17
|
||||
# last release v0.3.3 - Oct 16, 2024
|
||||
#
|
||||
# there were no breaking changes
|
||||
# so we can use the same version for all postgres versions
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v14" | "v15" | "v16" | "v17") \
|
||||
export PG_JSONSCHEMA_VERSION=0.3.3 \
|
||||
export PG_JSONSCHEMA_CHECKSUM=40c2cffab4187e0233cb8c3bde013be92218c282f95f4469c5282f6b30d64eac \
|
||||
;; \
|
||||
*) \
|
||||
echo "unexpected PostgreSQL version" && exit 1 \
|
||||
;; \
|
||||
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "pg_jsonschema does not yet have a release that supports pg17" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/supabase/pg_jsonschema/archive/refs/tags/v${PG_JSONSCHEMA_VERSION}.tar.gz -O pg_jsonschema.tar.gz && \
|
||||
echo "${PG_JSONSCHEMA_CHECKSUM} pg_jsonschema.tar.gz" | sha256sum --check && \
|
||||
wget https://github.com/supabase/pg_jsonschema/archive/refs/tags/v0.3.1.tar.gz -O pg_jsonschema.tar.gz && \
|
||||
echo "61df3db1ed83cf24f6aa39c826f8818bfa4f0bd33b587fd6b2b1747985642297 pg_jsonschema.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_jsonschema-src && cd pg_jsonschema-src && tar xzf ../pg_jsonschema.tar.gz --strip-components=1 -C . && \
|
||||
# see commit 252b3685a27a0f4c31a0f91e983c6314838e89e8
|
||||
# `unsafe-postgres` feature allows to build pgx extensions
|
||||
# against postgres forks that decided to change their ABI name (like us).
|
||||
# With that we can build extensions without forking them and using stock
|
||||
# pgx. As this feature is new few manual version bumps were required.
|
||||
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "0.12.6", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
sed -i 's/pgrx = "0.11.3"/pgrx = { version = "0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
cargo pgrx install --release && \
|
||||
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_jsonschema.control
|
||||
|
||||
@@ -991,27 +984,16 @@ RUN case "${PG_VERSION}" in \
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM rust-extensions-build-pgrx12 AS pg-graphql-pg-build
|
||||
FROM rust-extensions-build AS pg-graphql-pg-build
|
||||
ARG PG_VERSION
|
||||
|
||||
# version 1.5.9 supports v17
|
||||
# last release v1.5.9 - Oct 16, 2024
|
||||
#
|
||||
# there were no breaking changes
|
||||
# so we can use the same version for all postgres versions
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v14" | "v15" | "v16" | "v17") \
|
||||
export PG_GRAPHQL_VERSION=1.5.9 \
|
||||
export PG_GRAPHQL_CHECKSUM=cf768385a41278be1333472204fc0328118644ae443182cf52f7b9b23277e497 \
|
||||
;; \
|
||||
*) \
|
||||
echo "unexpected PostgreSQL version" && exit 1 \
|
||||
;; \
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "pg_graphql does not yet have a release that supports pg17 as of now" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/supabase/pg_graphql/archive/refs/tags/v${PG_GRAPHQL_VERSION}.tar.gz -O pg_graphql.tar.gz && \
|
||||
echo "${PG_GRAPHQL_CHECKSUM} pg_graphql.tar.gz" | sha256sum --check && \
|
||||
wget https://github.com/supabase/pg_graphql/archive/refs/tags/v1.5.7.tar.gz -O pg_graphql.tar.gz && \
|
||||
echo "2b3e567a5b31019cb97ae0e33263c1bcc28580be5a444ac4c8ece5c4be2aea41 pg_graphql.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_graphql-src && cd pg_graphql-src && tar xzf ../pg_graphql.tar.gz --strip-components=1 -C . && \
|
||||
sed -i 's/pgrx = "=0.12.6"/pgrx = { version = "0.12.6", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
sed -i 's/pgrx = "=0.11.3"/pgrx = { version = "0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
cargo pgrx install --release && \
|
||||
# it's needed to enable extension because it uses untrusted C language
|
||||
sed -i 's/superuser = false/superuser = true/g' /usr/local/pgsql/share/extension/pg_graphql.control && \
|
||||
@@ -1024,13 +1006,15 @@ RUN case "${PG_VERSION}" in \
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM rust-extensions-build-pgrx12 AS pg-tiktoken-pg-build
|
||||
FROM rust-extensions-build AS pg-tiktoken-pg-build
|
||||
ARG PG_VERSION
|
||||
|
||||
# doesn't use releases
|
||||
# 9118dd4549b7d8c0bbc98e04322499f7bf2fa6f7 - on Oct 29, 2024
|
||||
RUN wget https://github.com/kelvich/pg_tiktoken/archive/9118dd4549b7d8c0bbc98e04322499f7bf2fa6f7.tar.gz -O pg_tiktoken.tar.gz && \
|
||||
echo "a5bc447e7920ee149d3c064b8b9f0086c0e83939499753178f7d35788416f628 pg_tiktoken.tar.gz" | sha256sum --check && \
|
||||
# 26806147b17b60763039c6a6878884c41a262318 made on 26/09/2023
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "pg_tiktoken does not have versions, nor support for pg17" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/kelvich/pg_tiktoken/archive/26806147b17b60763039c6a6878884c41a262318.tar.gz -O pg_tiktoken.tar.gz && \
|
||||
echo "e64e55aaa38c259512d3e27c572da22c4637418cf124caba904cd50944e5004e pg_tiktoken.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_tiktoken-src && cd pg_tiktoken-src && tar xzf ../pg_tiktoken.tar.gz --strip-components=1 -C . && \
|
||||
# TODO update pgrx version in the pg_tiktoken repo and remove this line
|
||||
sed -i 's/pgrx = { version = "=0.10.2",/pgrx = { version = "0.11.3",/g' Cargo.toml && \
|
||||
@@ -1048,8 +1032,6 @@ RUN wget https://github.com/kelvich/pg_tiktoken/archive/9118dd4549b7d8c0bbc98e04
|
||||
FROM rust-extensions-build AS pg-pgx-ulid-build
|
||||
ARG PG_VERSION
|
||||
|
||||
# doesn't support v17 yet
|
||||
# https://github.com/pksunkara/pgx_ulid/pull/52
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "pgx_ulid does not support pg17 as of the latest version (0.1.5)" && exit 0;; \
|
||||
esac && \
|
||||
@@ -1067,16 +1049,16 @@ RUN case "${PG_VERSION}" in "v17") \
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM rust-extensions-build-pgrx12 AS pg-session-jwt-build
|
||||
FROM rust-extensions-build AS pg-session-jwt-build
|
||||
ARG PG_VERSION
|
||||
|
||||
# NOTE: local_proxy depends on the version of pg_session_jwt
|
||||
# Do not update without approve from proxy team
|
||||
# Make sure the version is reflected in proxy/src/serverless/local_conn_pool.rs
|
||||
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.1.2-v17.tar.gz -O pg_session_jwt.tar.gz && \
|
||||
echo "c8ecbed9cb8c6441bce5134a176002b043018adf9d05a08e457dda233090a86e pg_session_jwt.tar.gz" | sha256sum --check && \
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "pg_session_jwt does not yet have a release that supports pg17" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/neondatabase/pg_session_jwt/archive/e1310b08ba51377a19e0559e4d1194883b9b2ba2.tar.gz -O pg_session_jwt.tar.gz && \
|
||||
echo "837932a077888d5545fd54b0abcc79e5f8e37017c2769a930afc2f5c94df6f4e pg_session_jwt.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_session_jwt-src && cd pg_session_jwt-src && tar xzf ../pg_session_jwt.tar.gz --strip-components=1 -C . && \
|
||||
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "=0.12.6", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
sed -i 's/pgrx = "=0.11.3"/pgrx = { version = "=0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
cargo pgrx install --release
|
||||
|
||||
#########################################################################################
|
||||
|
||||
@@ -110,23 +110,6 @@ static MAXRSS_KB: Lazy<IntGauge> = Lazy::new(|| {
|
||||
pub const DISK_FSYNC_SECONDS_BUCKETS: &[f64] =
|
||||
&[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 30.0];
|
||||
|
||||
/// Constructs histogram buckets that are powers of two starting at 1 (i.e. 2^0), covering the end
|
||||
/// points. For example, passing start=5,end=20 yields 4,8,16,32 as does start=4,end=32.
|
||||
pub fn pow2_buckets(start: usize, end: usize) -> Vec<f64> {
|
||||
assert_ne!(start, 0);
|
||||
assert!(start <= end);
|
||||
let start = match start.checked_next_power_of_two() {
|
||||
Some(n) if n == start => n, // start already power of two
|
||||
Some(n) => n >> 1, // power of two below start
|
||||
None => panic!("start too large"),
|
||||
};
|
||||
let end = end.checked_next_power_of_two().expect("end too large");
|
||||
std::iter::successors(Some(start), |n| n.checked_mul(2))
|
||||
.take_while(|n| n <= &end)
|
||||
.map(|n| n as f64)
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub struct BuildInfo {
|
||||
pub revision: &'static str,
|
||||
pub build_tag: &'static str,
|
||||
@@ -612,67 +595,3 @@ where
|
||||
self.dec.collect_into(metadata, labels, name, &mut enc.0)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
const POW2_BUCKETS_MAX: usize = 1 << (usize::BITS - 1);
|
||||
|
||||
#[test]
|
||||
fn pow2_buckets_cases() {
|
||||
assert_eq!(pow2_buckets(1, 1), vec![1.0]);
|
||||
assert_eq!(pow2_buckets(1, 2), vec![1.0, 2.0]);
|
||||
assert_eq!(pow2_buckets(1, 3), vec![1.0, 2.0, 4.0]);
|
||||
assert_eq!(pow2_buckets(1, 4), vec![1.0, 2.0, 4.0]);
|
||||
assert_eq!(pow2_buckets(1, 5), vec![1.0, 2.0, 4.0, 8.0]);
|
||||
assert_eq!(pow2_buckets(1, 6), vec![1.0, 2.0, 4.0, 8.0]);
|
||||
assert_eq!(pow2_buckets(1, 7), vec![1.0, 2.0, 4.0, 8.0]);
|
||||
assert_eq!(pow2_buckets(1, 8), vec![1.0, 2.0, 4.0, 8.0]);
|
||||
assert_eq!(
|
||||
pow2_buckets(1, 200),
|
||||
vec![1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0]
|
||||
);
|
||||
|
||||
assert_eq!(pow2_buckets(1, 8), vec![1.0, 2.0, 4.0, 8.0]);
|
||||
assert_eq!(pow2_buckets(2, 8), vec![2.0, 4.0, 8.0]);
|
||||
assert_eq!(pow2_buckets(3, 8), vec![2.0, 4.0, 8.0]);
|
||||
assert_eq!(pow2_buckets(4, 8), vec![4.0, 8.0]);
|
||||
assert_eq!(pow2_buckets(5, 8), vec![4.0, 8.0]);
|
||||
assert_eq!(pow2_buckets(6, 8), vec![4.0, 8.0]);
|
||||
assert_eq!(pow2_buckets(7, 8), vec![4.0, 8.0]);
|
||||
assert_eq!(pow2_buckets(8, 8), vec![8.0]);
|
||||
assert_eq!(pow2_buckets(20, 200), vec![16.0, 32.0, 64.0, 128.0, 256.0]);
|
||||
|
||||
// Largest valid values.
|
||||
assert_eq!(
|
||||
pow2_buckets(1, POW2_BUCKETS_MAX).len(),
|
||||
usize::BITS as usize
|
||||
);
|
||||
assert_eq!(pow2_buckets(POW2_BUCKETS_MAX, POW2_BUCKETS_MAX).len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn pow2_buckets_zero_start() {
|
||||
pow2_buckets(0, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn pow2_buckets_end_lt_start() {
|
||||
pow2_buckets(2, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn pow2_buckets_end_overflow_min() {
|
||||
pow2_buckets(1, POW2_BUCKETS_MAX + 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn pow2_buckets_end_overflow_max() {
|
||||
pow2_buckets(1, usize::MAX);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
use once_cell::sync::Lazy;
|
||||
use postgres_backend::{AuthType, Handler, PostgresBackend, QueryError};
|
||||
use pq_proto::{BeMessage, RowDescriptor};
|
||||
use rustls::crypto::ring;
|
||||
use rustls::crypto::aws_lc_rs;
|
||||
use std::io::Cursor;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
@@ -94,7 +94,7 @@ async fn simple_select_ssl() {
|
||||
let (client_sock, server_sock) = make_tcp_pair().await;
|
||||
|
||||
let server_cfg =
|
||||
rustls::ServerConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
rustls::ServerConfig::builder_with_provider(Arc::new(aws_lc_rs::default_provider()))
|
||||
.with_safe_default_protocol_versions()
|
||||
.expect("aws_lc_rs should support the default protocol versions")
|
||||
.with_no_client_auth()
|
||||
@@ -110,7 +110,7 @@ async fn simple_select_ssl() {
|
||||
});
|
||||
|
||||
let client_cfg =
|
||||
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
rustls::ClientConfig::builder_with_provider(Arc::new(aws_lc_rs::default_provider()))
|
||||
.with_safe_default_protocol_versions()
|
||||
.expect("aws_lc_rs should support the default protocol versions")
|
||||
.with_root_certificates({
|
||||
|
||||
@@ -1,970 +1 @@
|
||||
//! This module contains logic for decoding and interpreting
|
||||
//! raw bytes which represent a raw Postgres WAL record.
|
||||
|
||||
use crate::models::*;
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use pageserver_api::value::Value;
|
||||
use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
|
||||
use postgres_ffi::walrecord::*;
|
||||
use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
impl InterpretedWalRecord {
|
||||
/// Decode and interpreted raw bytes which represent one Postgres WAL record.
|
||||
/// Data blocks which do not match the provided shard identity are filtered out.
|
||||
/// Shard 0 is a special case since it tracks all relation sizes. We only give it
|
||||
/// the keys that are being written as that is enough for updating relation sizes.
|
||||
pub fn from_bytes_filtered(
|
||||
buf: Bytes,
|
||||
shard: &ShardIdentity,
|
||||
lsn: Lsn,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<InterpretedWalRecord> {
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
decode_wal_record(buf, &mut decoded, pg_version)?;
|
||||
|
||||
let flush_uncommitted = if decoded.is_dbase_create_copy(pg_version) {
|
||||
FlushUncommittedRecords::Yes
|
||||
} else {
|
||||
FlushUncommittedRecords::No
|
||||
};
|
||||
|
||||
let metadata_record = MetadataRecord::from_decoded(&decoded, lsn, pg_version)?;
|
||||
|
||||
let mut blocks = Vec::default();
|
||||
for blk in decoded.blocks.iter() {
|
||||
let rel = RelTag {
|
||||
spcnode: blk.rnode_spcnode,
|
||||
dbnode: blk.rnode_dbnode,
|
||||
relnode: blk.rnode_relnode,
|
||||
forknum: blk.forknum,
|
||||
};
|
||||
|
||||
let key = rel_block_to_key(rel, blk.blkno);
|
||||
|
||||
if !key.is_valid_key_on_write_path() {
|
||||
anyhow::bail!("Unsupported key decoded at LSN {}: {}", lsn, key);
|
||||
}
|
||||
|
||||
let key_is_local = shard.is_key_local(&key);
|
||||
|
||||
tracing::debug!(
|
||||
lsn=%lsn,
|
||||
key=%key,
|
||||
"ingest: shard decision {}",
|
||||
if !key_is_local { "drop" } else { "keep" },
|
||||
);
|
||||
|
||||
if !key_is_local {
|
||||
if shard.is_shard_zero() {
|
||||
// Shard 0 tracks relation sizes. Although we will not store this block, we will observe
|
||||
// its blkno in case it implicitly extends a relation.
|
||||
blocks.push((key.to_compact(), None));
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
// Instead of storing full-page-image WAL record,
|
||||
// it is better to store extracted image: we can skip wal-redo
|
||||
// in this case. Also some FPI records may contain multiple (up to 32) pages,
|
||||
// so them have to be copied multiple times.
|
||||
//
|
||||
let value = if blk.apply_image
|
||||
&& blk.has_image
|
||||
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID
|
||||
&& (decoded.xl_info == pg_constants::XLOG_FPI
|
||||
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
|
||||
// compression of WAL is not yet supported: fall back to storing the original WAL record
|
||||
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)
|
||||
// do not materialize null pages because them most likely be soon replaced with real data
|
||||
&& blk.bimg_len != 0
|
||||
{
|
||||
// Extract page image from FPI record
|
||||
let img_len = blk.bimg_len as usize;
|
||||
let img_offs = blk.bimg_offset as usize;
|
||||
let mut image = BytesMut::with_capacity(BLCKSZ as usize);
|
||||
// TODO(vlad): skip the copy
|
||||
image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
|
||||
|
||||
if blk.hole_length != 0 {
|
||||
let tail = image.split_off(blk.hole_offset as usize);
|
||||
image.resize(image.len() + blk.hole_length as usize, 0u8);
|
||||
image.unsplit(tail);
|
||||
}
|
||||
//
|
||||
// Match the logic of XLogReadBufferForRedoExtended:
|
||||
// The page may be uninitialized. If so, we can't set the LSN because
|
||||
// that would corrupt the page.
|
||||
//
|
||||
if !page_is_new(&image) {
|
||||
page_set_lsn(&mut image, lsn)
|
||||
}
|
||||
assert_eq!(image.len(), BLCKSZ as usize);
|
||||
|
||||
Value::Image(image.freeze())
|
||||
} else {
|
||||
Value::WalRecord(NeonWalRecord::Postgres {
|
||||
will_init: blk.will_init || blk.apply_image,
|
||||
rec: decoded.record.clone(),
|
||||
})
|
||||
};
|
||||
|
||||
blocks.push((key.to_compact(), Some(value)));
|
||||
}
|
||||
|
||||
Ok(InterpretedWalRecord {
|
||||
metadata_record,
|
||||
blocks,
|
||||
lsn,
|
||||
flush_uncommitted,
|
||||
xid: decoded.xl_xid,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl MetadataRecord {
|
||||
fn from_decoded(
|
||||
decoded: &DecodedWALRecord,
|
||||
lsn: Lsn,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
// Note: this doesn't actually copy the bytes since
|
||||
// the [`Bytes`] type implements it via a level of indirection.
|
||||
let mut buf = decoded.record.clone();
|
||||
buf.advance(decoded.main_data_offset);
|
||||
|
||||
match decoded.xl_rmid {
|
||||
pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
|
||||
Self::decode_heapam_record(&mut buf, decoded, pg_version)
|
||||
}
|
||||
pg_constants::RM_NEON_ID => Self::decode_neonmgr_record(&mut buf, decoded, pg_version),
|
||||
// Handle other special record types
|
||||
pg_constants::RM_SMGR_ID => Self::decode_smgr_record(&mut buf, decoded),
|
||||
pg_constants::RM_DBASE_ID => Self::decode_dbase_record(&mut buf, decoded, pg_version),
|
||||
pg_constants::RM_TBLSPC_ID => {
|
||||
tracing::trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
|
||||
Ok(None)
|
||||
}
|
||||
pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version),
|
||||
pg_constants::RM_XACT_ID => Self::decode_xact_record(&mut buf, decoded, lsn),
|
||||
pg_constants::RM_MULTIXACT_ID => {
|
||||
Self::decode_multixact_record(&mut buf, decoded, pg_version)
|
||||
}
|
||||
pg_constants::RM_RELMAP_ID => Self::decode_relmap_record(&mut buf, decoded),
|
||||
// This is an odd duck. It needs to go to all shards.
|
||||
// Since it uses the checkpoint image (that's initialized from CHECKPOINT_KEY
|
||||
// in WalIngest::new), we have to send the whole DecodedWalRecord::record to
|
||||
// the pageserver and decode it there.
|
||||
//
|
||||
// Alternatively, one can make the checkpoint part of the subscription protocol
|
||||
// to the pageserver. This should work fine, but can be done at a later point.
|
||||
pg_constants::RM_XLOG_ID => Self::decode_xlog_record(&mut buf, decoded, lsn),
|
||||
pg_constants::RM_LOGICALMSG_ID => {
|
||||
Self::decode_logical_message_record(&mut buf, decoded)
|
||||
}
|
||||
pg_constants::RM_STANDBY_ID => Self::decode_standby_record(&mut buf, decoded),
|
||||
pg_constants::RM_REPLORIGIN_ID => Self::decode_replorigin_record(&mut buf, decoded),
|
||||
_unexpected => {
|
||||
// TODO: consider failing here instead of blindly doing something without
|
||||
// understanding the protocol
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn decode_heapam_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
// Handle VM bit updates that are implicitly part of heap records.
|
||||
|
||||
// First, look at the record to determine which VM bits need
|
||||
// to be cleared. If either of these variables is set, we
|
||||
// need to clear the corresponding bits in the visibility map.
|
||||
let mut new_heap_blkno: Option<u32> = None;
|
||||
let mut old_heap_blkno: Option<u32> = None;
|
||||
let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
|
||||
|
||||
match pg_version {
|
||||
14 => {
|
||||
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
|
||||
if info == pg_constants::XLOG_HEAP_INSERT {
|
||||
let xlrec = v14::XlHeapInsert::decode(buf);
|
||||
assert_eq!(0, buf.remaining());
|
||||
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_DELETE {
|
||||
let xlrec = v14::XlHeapDelete::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_UPDATE
|
||||
|| info == pg_constants::XLOG_HEAP_HOT_UPDATE
|
||||
{
|
||||
let xlrec = v14::XlHeapUpdate::decode(buf);
|
||||
// the size of tuple data is inferred from the size of the record.
|
||||
// we can't validate the remaining number of bytes without parsing
|
||||
// the tuple data.
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
|
||||
}
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
|
||||
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
|
||||
// non-HOT update where the new tuple goes to different page than
|
||||
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
|
||||
// set.
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_LOCK {
|
||||
let xlrec = v14::XlHeapLock::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
}
|
||||
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
|
||||
let xlrec = v14::XlHeapMultiInsert::decode(buf);
|
||||
|
||||
let offset_array_len =
|
||||
if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
|
||||
// the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
|
||||
0
|
||||
} else {
|
||||
size_of::<u16>() * xlrec.ntuples as usize
|
||||
};
|
||||
assert_eq!(offset_array_len, buf.remaining());
|
||||
|
||||
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
|
||||
let xlrec = v14::XlHeapLockUpdated::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
|
||||
}
|
||||
}
|
||||
15 => {
|
||||
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
|
||||
if info == pg_constants::XLOG_HEAP_INSERT {
|
||||
let xlrec = v15::XlHeapInsert::decode(buf);
|
||||
assert_eq!(0, buf.remaining());
|
||||
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_DELETE {
|
||||
let xlrec = v15::XlHeapDelete::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_UPDATE
|
||||
|| info == pg_constants::XLOG_HEAP_HOT_UPDATE
|
||||
{
|
||||
let xlrec = v15::XlHeapUpdate::decode(buf);
|
||||
// the size of tuple data is inferred from the size of the record.
|
||||
// we can't validate the remaining number of bytes without parsing
|
||||
// the tuple data.
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
|
||||
}
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
|
||||
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
|
||||
// non-HOT update where the new tuple goes to different page than
|
||||
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
|
||||
// set.
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_LOCK {
|
||||
let xlrec = v15::XlHeapLock::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
}
|
||||
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
|
||||
let xlrec = v15::XlHeapMultiInsert::decode(buf);
|
||||
|
||||
let offset_array_len =
|
||||
if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
|
||||
// the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
|
||||
0
|
||||
} else {
|
||||
size_of::<u16>() * xlrec.ntuples as usize
|
||||
};
|
||||
assert_eq!(offset_array_len, buf.remaining());
|
||||
|
||||
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
|
||||
let xlrec = v15::XlHeapLockUpdated::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
|
||||
}
|
||||
}
|
||||
16 => {
|
||||
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
|
||||
if info == pg_constants::XLOG_HEAP_INSERT {
|
||||
let xlrec = v16::XlHeapInsert::decode(buf);
|
||||
assert_eq!(0, buf.remaining());
|
||||
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_DELETE {
|
||||
let xlrec = v16::XlHeapDelete::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_UPDATE
|
||||
|| info == pg_constants::XLOG_HEAP_HOT_UPDATE
|
||||
{
|
||||
let xlrec = v16::XlHeapUpdate::decode(buf);
|
||||
// the size of tuple data is inferred from the size of the record.
|
||||
// we can't validate the remaining number of bytes without parsing
|
||||
// the tuple data.
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
|
||||
}
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
|
||||
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
|
||||
// non-HOT update where the new tuple goes to different page than
|
||||
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
|
||||
// set.
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_LOCK {
|
||||
let xlrec = v16::XlHeapLock::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
}
|
||||
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
|
||||
let xlrec = v16::XlHeapMultiInsert::decode(buf);
|
||||
|
||||
let offset_array_len =
|
||||
if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
|
||||
// the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
|
||||
0
|
||||
} else {
|
||||
size_of::<u16>() * xlrec.ntuples as usize
|
||||
};
|
||||
assert_eq!(offset_array_len, buf.remaining());
|
||||
|
||||
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
|
||||
let xlrec = v16::XlHeapLockUpdated::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
|
||||
}
|
||||
}
|
||||
17 => {
|
||||
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
|
||||
if info == pg_constants::XLOG_HEAP_INSERT {
|
||||
let xlrec = v17::XlHeapInsert::decode(buf);
|
||||
assert_eq!(0, buf.remaining());
|
||||
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_DELETE {
|
||||
let xlrec = v17::XlHeapDelete::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_UPDATE
|
||||
|| info == pg_constants::XLOG_HEAP_HOT_UPDATE
|
||||
{
|
||||
let xlrec = v17::XlHeapUpdate::decode(buf);
|
||||
// the size of tuple data is inferred from the size of the record.
|
||||
// we can't validate the remaining number of bytes without parsing
|
||||
// the tuple data.
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
|
||||
}
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
|
||||
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
|
||||
// non-HOT update where the new tuple goes to different page than
|
||||
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
|
||||
// set.
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_LOCK {
|
||||
let xlrec = v17::XlHeapLock::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
}
|
||||
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
|
||||
let xlrec = v17::XlHeapMultiInsert::decode(buf);
|
||||
|
||||
let offset_array_len =
|
||||
if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
|
||||
// the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
|
||||
0
|
||||
} else {
|
||||
size_of::<u16>() * xlrec.ntuples as usize
|
||||
};
|
||||
assert_eq!(offset_array_len, buf.remaining());
|
||||
|
||||
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
|
||||
let xlrec = v17::XlHeapLockUpdated::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
|
||||
let vm_rel = RelTag {
|
||||
forknum: VISIBILITYMAP_FORKNUM,
|
||||
spcnode: decoded.blocks[0].rnode_spcnode,
|
||||
dbnode: decoded.blocks[0].rnode_dbnode,
|
||||
relnode: decoded.blocks[0].rnode_relnode,
|
||||
};
|
||||
|
||||
Ok(Some(MetadataRecord::Heapam(HeapamRecord::ClearVmBits(
|
||||
ClearVmBits {
|
||||
new_heap_blkno,
|
||||
old_heap_blkno,
|
||||
vm_rel,
|
||||
flags,
|
||||
},
|
||||
))))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn decode_neonmgr_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
// Handle VM bit updates that are implicitly part of heap records.
|
||||
|
||||
// First, look at the record to determine which VM bits need
|
||||
// to be cleared. If either of these variables is set, we
|
||||
// need to clear the corresponding bits in the visibility map.
|
||||
let mut new_heap_blkno: Option<u32> = None;
|
||||
let mut old_heap_blkno: Option<u32> = None;
|
||||
let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
|
||||
|
||||
assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
|
||||
|
||||
match pg_version {
|
||||
16 | 17 => {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
|
||||
match info {
|
||||
pg_constants::XLOG_NEON_HEAP_INSERT => {
|
||||
let xlrec = v17::rm_neon::XlNeonHeapInsert::decode(buf);
|
||||
assert_eq!(0, buf.remaining());
|
||||
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
}
|
||||
pg_constants::XLOG_NEON_HEAP_DELETE => {
|
||||
let xlrec = v17::rm_neon::XlNeonHeapDelete::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
}
|
||||
pg_constants::XLOG_NEON_HEAP_UPDATE
|
||||
| pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
|
||||
let xlrec = v17::rm_neon::XlNeonHeapUpdate::decode(buf);
|
||||
// the size of tuple data is inferred from the size of the record.
|
||||
// we can't validate the remaining number of bytes without parsing
|
||||
// the tuple data.
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
|
||||
}
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
|
||||
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
|
||||
// non-HOT update where the new tuple goes to different page than
|
||||
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
|
||||
// set.
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
}
|
||||
pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
|
||||
let xlrec = v17::rm_neon::XlNeonHeapMultiInsert::decode(buf);
|
||||
|
||||
let offset_array_len =
|
||||
if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
|
||||
// the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
|
||||
0
|
||||
} else {
|
||||
size_of::<u16>() * xlrec.ntuples as usize
|
||||
};
|
||||
assert_eq!(offset_array_len, buf.remaining());
|
||||
|
||||
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
}
|
||||
pg_constants::XLOG_NEON_HEAP_LOCK => {
|
||||
let xlrec = v17::rm_neon::XlNeonHeapLock::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
}
|
||||
info => anyhow::bail!("Unknown WAL record type for Neon RMGR: {}", info),
|
||||
}
|
||||
}
|
||||
_ => anyhow::bail!(
|
||||
"Neon RMGR has no known compatibility with PostgreSQL version {}",
|
||||
pg_version
|
||||
),
|
||||
}
|
||||
|
||||
if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
|
||||
let vm_rel = RelTag {
|
||||
forknum: VISIBILITYMAP_FORKNUM,
|
||||
spcnode: decoded.blocks[0].rnode_spcnode,
|
||||
dbnode: decoded.blocks[0].rnode_dbnode,
|
||||
relnode: decoded.blocks[0].rnode_relnode,
|
||||
};
|
||||
|
||||
Ok(Some(MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(
|
||||
ClearVmBits {
|
||||
new_heap_blkno,
|
||||
old_heap_blkno,
|
||||
vm_rel,
|
||||
flags,
|
||||
},
|
||||
))))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn decode_smgr_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
if info == pg_constants::XLOG_SMGR_CREATE {
|
||||
let create = XlSmgrCreate::decode(buf);
|
||||
let rel = RelTag {
|
||||
spcnode: create.rnode.spcnode,
|
||||
dbnode: create.rnode.dbnode,
|
||||
relnode: create.rnode.relnode,
|
||||
forknum: create.forknum,
|
||||
};
|
||||
|
||||
return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Create(SmgrCreate {
|
||||
rel,
|
||||
}))));
|
||||
} else if info == pg_constants::XLOG_SMGR_TRUNCATE {
|
||||
let truncate = XlSmgrTruncate::decode(buf);
|
||||
return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Truncate(truncate))));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn decode_dbase_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
// TODO: Refactor this to avoid the duplication between postgres versions.
|
||||
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
tracing::debug!(%info, %pg_version, "handle RM_DBASE_ID");
|
||||
|
||||
if pg_version == 14 {
|
||||
if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
|
||||
let createdb = XlCreateDatabase::decode(buf);
|
||||
tracing::debug!("XLOG_DBASE_CREATE v14");
|
||||
|
||||
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
|
||||
db_id: createdb.db_id,
|
||||
tablespace_id: createdb.tablespace_id,
|
||||
src_db_id: createdb.src_db_id,
|
||||
src_tablespace_id: createdb.src_tablespace_id,
|
||||
}));
|
||||
|
||||
return Ok(Some(record));
|
||||
} else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
|
||||
let dropdb = XlDropDatabase::decode(buf);
|
||||
|
||||
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
|
||||
db_id: dropdb.db_id,
|
||||
tablespace_ids: dropdb.tablespace_ids,
|
||||
}));
|
||||
|
||||
return Ok(Some(record));
|
||||
}
|
||||
} else if pg_version == 15 {
|
||||
if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
|
||||
tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
|
||||
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
|
||||
// The XLOG record was renamed between v14 and v15,
|
||||
// but the record format is the same.
|
||||
// So we can reuse XlCreateDatabase here.
|
||||
tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
|
||||
|
||||
let createdb = XlCreateDatabase::decode(buf);
|
||||
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
|
||||
db_id: createdb.db_id,
|
||||
tablespace_id: createdb.tablespace_id,
|
||||
src_db_id: createdb.src_db_id,
|
||||
src_tablespace_id: createdb.src_tablespace_id,
|
||||
}));
|
||||
|
||||
return Ok(Some(record));
|
||||
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
|
||||
let dropdb = XlDropDatabase::decode(buf);
|
||||
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
|
||||
db_id: dropdb.db_id,
|
||||
tablespace_ids: dropdb.tablespace_ids,
|
||||
}));
|
||||
|
||||
return Ok(Some(record));
|
||||
}
|
||||
} else if pg_version == 16 {
|
||||
if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
|
||||
tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
|
||||
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
|
||||
// The XLOG record was renamed between v14 and v15,
|
||||
// but the record format is the same.
|
||||
// So we can reuse XlCreateDatabase here.
|
||||
tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
|
||||
|
||||
let createdb = XlCreateDatabase::decode(buf);
|
||||
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
|
||||
db_id: createdb.db_id,
|
||||
tablespace_id: createdb.tablespace_id,
|
||||
src_db_id: createdb.src_db_id,
|
||||
src_tablespace_id: createdb.src_tablespace_id,
|
||||
}));
|
||||
|
||||
return Ok(Some(record));
|
||||
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
|
||||
let dropdb = XlDropDatabase::decode(buf);
|
||||
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
|
||||
db_id: dropdb.db_id,
|
||||
tablespace_ids: dropdb.tablespace_ids,
|
||||
}));
|
||||
|
||||
return Ok(Some(record));
|
||||
}
|
||||
} else if pg_version == 17 {
|
||||
if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG {
|
||||
tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
|
||||
} else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY {
|
||||
// The XLOG record was renamed between v14 and v15,
|
||||
// but the record format is the same.
|
||||
// So we can reuse XlCreateDatabase here.
|
||||
tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
|
||||
|
||||
let createdb = XlCreateDatabase::decode(buf);
|
||||
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
|
||||
db_id: createdb.db_id,
|
||||
tablespace_id: createdb.tablespace_id,
|
||||
src_db_id: createdb.src_db_id,
|
||||
src_tablespace_id: createdb.src_tablespace_id,
|
||||
}));
|
||||
|
||||
return Ok(Some(record));
|
||||
} else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP {
|
||||
let dropdb = XlDropDatabase::decode(buf);
|
||||
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
|
||||
db_id: dropdb.db_id,
|
||||
tablespace_ids: dropdb.tablespace_ids,
|
||||
}));
|
||||
|
||||
return Ok(Some(record));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn decode_clog_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
|
||||
|
||||
if info == pg_constants::CLOG_ZEROPAGE {
|
||||
let pageno = if pg_version < 17 {
|
||||
buf.get_u32_le()
|
||||
} else {
|
||||
buf.get_u64_le() as u32
|
||||
};
|
||||
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
|
||||
Ok(Some(MetadataRecord::Clog(ClogRecord::ZeroPage(
|
||||
ClogZeroPage { segno, rpageno },
|
||||
))))
|
||||
} else {
|
||||
assert!(info == pg_constants::CLOG_TRUNCATE);
|
||||
let xlrec = XlClogTruncate::decode(buf, pg_version);
|
||||
|
||||
Ok(Some(MetadataRecord::Clog(ClogRecord::Truncate(
|
||||
ClogTruncate {
|
||||
pageno: xlrec.pageno,
|
||||
oldest_xid: xlrec.oldest_xid,
|
||||
oldest_xid_db: xlrec.oldest_xid_db,
|
||||
},
|
||||
))))
|
||||
}
|
||||
}
|
||||
|
||||
fn decode_xact_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||
let origin_id = decoded.origin_id;
|
||||
let xl_xid = decoded.xl_xid;
|
||||
|
||||
if info == pg_constants::XLOG_XACT_COMMIT {
|
||||
let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
|
||||
return Ok(Some(MetadataRecord::Xact(XactRecord::Commit(XactCommon {
|
||||
parsed,
|
||||
origin_id,
|
||||
xl_xid,
|
||||
lsn,
|
||||
}))));
|
||||
} else if info == pg_constants::XLOG_XACT_ABORT {
|
||||
let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
|
||||
return Ok(Some(MetadataRecord::Xact(XactRecord::Abort(XactCommon {
|
||||
parsed,
|
||||
origin_id,
|
||||
xl_xid,
|
||||
lsn,
|
||||
}))));
|
||||
} else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
|
||||
let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
|
||||
return Ok(Some(MetadataRecord::Xact(XactRecord::CommitPrepared(
|
||||
XactCommon {
|
||||
parsed,
|
||||
origin_id,
|
||||
xl_xid,
|
||||
lsn,
|
||||
},
|
||||
))));
|
||||
} else if info == pg_constants::XLOG_XACT_ABORT_PREPARED {
|
||||
let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
|
||||
return Ok(Some(MetadataRecord::Xact(XactRecord::AbortPrepared(
|
||||
XactCommon {
|
||||
parsed,
|
||||
origin_id,
|
||||
xl_xid,
|
||||
lsn,
|
||||
},
|
||||
))));
|
||||
} else if info == pg_constants::XLOG_XACT_PREPARE {
|
||||
return Ok(Some(MetadataRecord::Xact(XactRecord::Prepare(
|
||||
XactPrepare {
|
||||
xl_xid: decoded.xl_xid,
|
||||
data: Bytes::copy_from_slice(&buf[..]),
|
||||
},
|
||||
))));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn decode_multixact_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
|
||||
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE
|
||||
|| info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE
|
||||
{
|
||||
let pageno = if pg_version < 17 {
|
||||
buf.get_u32_le()
|
||||
} else {
|
||||
buf.get_u64_le() as u32
|
||||
};
|
||||
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
|
||||
let slru_kind = match info {
|
||||
pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE => SlruKind::MultiXactOffsets,
|
||||
pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE => SlruKind::MultiXactMembers,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::ZeroPage(
|
||||
MultiXactZeroPage {
|
||||
slru_kind,
|
||||
segno,
|
||||
rpageno,
|
||||
},
|
||||
))));
|
||||
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
|
||||
let xlrec = XlMultiXactCreate::decode(buf);
|
||||
return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Create(
|
||||
xlrec,
|
||||
))));
|
||||
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
|
||||
let xlrec = XlMultiXactTruncate::decode(buf);
|
||||
return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Truncate(
|
||||
xlrec,
|
||||
))));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn decode_relmap_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
let update = XlRelmapUpdate::decode(buf);
|
||||
|
||||
let mut buf = decoded.record.clone();
|
||||
buf.advance(decoded.main_data_offset);
|
||||
// skip xl_relmap_update
|
||||
buf.advance(12);
|
||||
|
||||
Ok(Some(MetadataRecord::Relmap(RelmapRecord::Update(
|
||||
RelmapUpdate {
|
||||
update,
|
||||
buf: Bytes::copy_from_slice(&buf[..]),
|
||||
},
|
||||
))))
|
||||
}
|
||||
|
||||
fn decode_xlog_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
Ok(Some(MetadataRecord::Xlog(XlogRecord::Raw(RawXlogRecord {
|
||||
info,
|
||||
lsn,
|
||||
buf: buf.clone(),
|
||||
}))))
|
||||
}
|
||||
|
||||
fn decode_logical_message_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
if info == pg_constants::XLOG_LOGICAL_MESSAGE {
|
||||
let xlrec = XlLogicalMessage::decode(buf);
|
||||
let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
if prefix == "neon-test" {
|
||||
return Ok(Some(MetadataRecord::LogicalMessage(
|
||||
LogicalMessageRecord::Failpoint,
|
||||
)));
|
||||
}
|
||||
|
||||
if let Some(path) = prefix.strip_prefix("neon-file:") {
|
||||
let buf_size = xlrec.prefix_size + xlrec.message_size;
|
||||
let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]);
|
||||
return Ok(Some(MetadataRecord::LogicalMessage(
|
||||
LogicalMessageRecord::Put(PutLogicalMessage {
|
||||
path: path.to_string(),
|
||||
buf,
|
||||
}),
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn decode_standby_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
if info == pg_constants::XLOG_RUNNING_XACTS {
|
||||
let xlrec = XlRunningXacts::decode(buf);
|
||||
return Ok(Some(MetadataRecord::Standby(StandbyRecord::RunningXacts(
|
||||
StandbyRunningXacts {
|
||||
oldest_running_xid: xlrec.oldest_running_xid,
|
||||
},
|
||||
))));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn decode_replorigin_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
if info == pg_constants::XLOG_REPLORIGIN_SET {
|
||||
let xlrec = XlReploriginSet::decode(buf);
|
||||
return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Set(
|
||||
xlrec,
|
||||
))));
|
||||
} else if info == pg_constants::XLOG_REPLORIGIN_DROP {
|
||||
let xlrec = XlReploriginDrop::decode(buf);
|
||||
return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Drop(
|
||||
xlrec,
|
||||
))));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,9 +25,7 @@
|
||||
//! |--> write to KV store within the pageserver
|
||||
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::key::CompactKey;
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use pageserver_api::value::Value;
|
||||
use postgres_ffi::walrecord::{
|
||||
XlMultiXactCreate, XlMultiXactTruncate, XlRelmapUpdate, XlReploriginDrop, XlReploriginSet,
|
||||
XlSmgrTruncate, XlXactParsedRecord,
|
||||
@@ -35,48 +33,6 @@ use postgres_ffi::walrecord::{
|
||||
use postgres_ffi::{Oid, TransactionId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
pub enum FlushUncommittedRecords {
|
||||
Yes,
|
||||
No,
|
||||
}
|
||||
|
||||
/// An interpreted Postgres WAL record, ready to be handled by the pageserver
|
||||
pub struct InterpretedWalRecord {
|
||||
/// Optional metadata record - may cause writes to metadata keys
|
||||
/// in the storage engine
|
||||
pub metadata_record: Option<MetadataRecord>,
|
||||
/// Images or deltas for blocks modified in the original WAL record.
|
||||
/// The [`Value`] is optional to avoid sending superfluous data to
|
||||
/// shard 0 for relation size tracking.
|
||||
pub blocks: Vec<(CompactKey, Option<Value>)>,
|
||||
/// Byte offset within WAL for the end of the original PG WAL record
|
||||
pub lsn: Lsn,
|
||||
/// Whether to flush all uncommitted modifications to the storage engine
|
||||
/// before ingesting this record. This is currently only used for legacy PG
|
||||
/// database creations which read pages from a template database. Such WAL
|
||||
/// records require reading data blocks while ingesting, hence the need to flush.
|
||||
pub flush_uncommitted: FlushUncommittedRecords,
|
||||
/// Transaction id of the original PG WAL record
|
||||
pub xid: TransactionId,
|
||||
}
|
||||
|
||||
/// The interpreted part of the Postgres WAL record which requires metadata
|
||||
/// writes to the underlying storage engine.
|
||||
pub enum MetadataRecord {
|
||||
Heapam(HeapamRecord),
|
||||
Neonrmgr(NeonrmgrRecord),
|
||||
Smgr(SmgrRecord),
|
||||
Dbase(DbaseRecord),
|
||||
Clog(ClogRecord),
|
||||
Xact(XactRecord),
|
||||
MultiXact(MultiXactRecord),
|
||||
Relmap(RelmapRecord),
|
||||
Xlog(XlogRecord),
|
||||
LogicalMessage(LogicalMessageRecord),
|
||||
Standby(StandbyRecord),
|
||||
Replorigin(ReploriginRecord),
|
||||
}
|
||||
|
||||
pub enum HeapamRecord {
|
||||
ClearVmBits(ClearVmBits),
|
||||
}
|
||||
|
||||
@@ -398,7 +398,9 @@ fn start_pageserver(
|
||||
ControllerUpcallClient::new(conf, &shutdown_pageserver),
|
||||
conf,
|
||||
);
|
||||
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());
|
||||
if let Some(deletion_workers) = deletion_workers {
|
||||
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());
|
||||
}
|
||||
|
||||
// Up to this point no significant I/O has been done: this should have been fast. Record
|
||||
// duration prior to starting I/O intensive phase of startup.
|
||||
|
||||
@@ -14,7 +14,6 @@ use itertools::Itertools as _;
|
||||
use pageserver_api::models::TenantState;
|
||||
use remote_storage::{GenericRemoteStorage, RemoteStorageConfig};
|
||||
use reqwest::Url;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime};
|
||||
@@ -36,62 +35,12 @@ const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
/// upload attempts.
|
||||
type RawMetric = (MetricsKey, (EventType, u64));
|
||||
|
||||
/// The new serializable metrics format
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct NewMetricsRoot {
|
||||
version: usize,
|
||||
metrics: Vec<NewRawMetric>,
|
||||
}
|
||||
|
||||
impl NewMetricsRoot {
|
||||
pub fn is_v2_metrics(json_value: &serde_json::Value) -> bool {
|
||||
if let Some(ver) = json_value.get("version") {
|
||||
if let Some(2) = ver.as_u64() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// The new serializable metrics format
|
||||
#[derive(Serialize)]
|
||||
struct NewMetricsRefRoot<'a> {
|
||||
version: usize,
|
||||
metrics: &'a [NewRawMetric],
|
||||
}
|
||||
|
||||
impl<'a> NewMetricsRefRoot<'a> {
|
||||
fn new(metrics: &'a [NewRawMetric]) -> Self {
|
||||
Self {
|
||||
version: 2,
|
||||
metrics,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The new serializable metrics format
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
|
||||
struct NewRawMetric {
|
||||
key: MetricsKey,
|
||||
kind: EventType,
|
||||
value: u64,
|
||||
// TODO: add generation field and check against generations
|
||||
}
|
||||
|
||||
impl NewRawMetric {
|
||||
#[cfg(test)]
|
||||
fn to_kv_pair(&self) -> (MetricsKey, NewRawMetric) {
|
||||
(self.key, self.clone())
|
||||
}
|
||||
}
|
||||
|
||||
/// Caches the [`RawMetric`]s
|
||||
///
|
||||
/// In practice, during startup, last sent values are stored here to be used in calculating new
|
||||
/// ones. After successful uploading, the cached values are updated to cache. This used to be used
|
||||
/// for deduplication, but that is no longer needed.
|
||||
type Cache = HashMap<MetricsKey, NewRawMetric>;
|
||||
type Cache = HashMap<MetricsKey, (EventType, u64)>;
|
||||
|
||||
pub async fn run(
|
||||
conf: &'static PageServerConf,
|
||||
@@ -282,14 +231,11 @@ async fn restore_and_reschedule(
|
||||
// collect_all_metrics
|
||||
let earlier_metric_at = found_some
|
||||
.iter()
|
||||
.map(|item| item.kind.recorded_at())
|
||||
.map(|(_, (et, _))| et.recorded_at())
|
||||
.copied()
|
||||
.next();
|
||||
|
||||
let cached = found_some
|
||||
.into_iter()
|
||||
.map(|item| (item.key, item))
|
||||
.collect::<Cache>();
|
||||
let cached = found_some.into_iter().collect::<Cache>();
|
||||
|
||||
(cached, earlier_metric_at)
|
||||
}
|
||||
|
||||
@@ -2,33 +2,11 @@ use anyhow::Context;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::consumption_metrics::NewMetricsRefRoot;
|
||||
|
||||
use super::{NewMetricsRoot, NewRawMetric, RawMetric};
|
||||
|
||||
pub(super) fn read_metrics_from_serde_value(
|
||||
json_value: serde_json::Value,
|
||||
) -> anyhow::Result<Vec<NewRawMetric>> {
|
||||
if NewMetricsRoot::is_v2_metrics(&json_value) {
|
||||
let root = serde_json::from_value::<NewMetricsRoot>(json_value)?;
|
||||
Ok(root.metrics)
|
||||
} else {
|
||||
let all_metrics = serde_json::from_value::<Vec<RawMetric>>(json_value)?;
|
||||
let all_metrics = all_metrics
|
||||
.into_iter()
|
||||
.map(|(key, (event_type, value))| NewRawMetric {
|
||||
key,
|
||||
kind: event_type,
|
||||
value,
|
||||
})
|
||||
.collect();
|
||||
Ok(all_metrics)
|
||||
}
|
||||
}
|
||||
use super::RawMetric;
|
||||
|
||||
pub(super) async fn read_metrics_from_disk(
|
||||
path: Arc<Utf8PathBuf>,
|
||||
) -> anyhow::Result<Vec<NewRawMetric>> {
|
||||
) -> anyhow::Result<Vec<RawMetric>> {
|
||||
// do not add context to each error, callsite will log with full path
|
||||
let span = tracing::Span::current();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
@@ -42,8 +20,7 @@ pub(super) async fn read_metrics_from_disk(
|
||||
|
||||
let mut file = std::fs::File::open(&*path)?;
|
||||
let reader = std::io::BufReader::new(&mut file);
|
||||
let json_value = serde_json::from_reader::<_, serde_json::Value>(reader)?;
|
||||
read_metrics_from_serde_value(json_value)
|
||||
anyhow::Ok(serde_json::from_reader::<_, Vec<RawMetric>>(reader)?)
|
||||
})
|
||||
.await
|
||||
.context("read metrics join error")
|
||||
@@ -86,7 +63,7 @@ fn scan_and_delete_with_same_prefix(path: &Utf8Path) -> std::io::Result<()> {
|
||||
}
|
||||
|
||||
pub(super) async fn flush_metrics_to_disk(
|
||||
current_metrics: &Arc<Vec<NewRawMetric>>,
|
||||
current_metrics: &Arc<Vec<RawMetric>>,
|
||||
path: &Arc<Utf8PathBuf>,
|
||||
) -> anyhow::Result<()> {
|
||||
use std::io::Write;
|
||||
@@ -116,11 +93,8 @@ pub(super) async fn flush_metrics_to_disk(
|
||||
// write out all of the raw metrics, to be read out later on restart as cached values
|
||||
{
|
||||
let mut writer = std::io::BufWriter::new(&mut tempfile);
|
||||
serde_json::to_writer(
|
||||
&mut writer,
|
||||
&NewMetricsRefRoot::new(current_metrics.as_ref()),
|
||||
)
|
||||
.context("serialize metrics")?;
|
||||
serde_json::to_writer(&mut writer, &*current_metrics)
|
||||
.context("serialize metrics")?;
|
||||
writer
|
||||
.into_inner()
|
||||
.map_err(|_| anyhow::anyhow!("flushing metrics failed"))?;
|
||||
|
||||
@@ -9,7 +9,7 @@ use utils::{
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use super::{Cache, NewRawMetric};
|
||||
use super::{Cache, RawMetric};
|
||||
|
||||
/// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
|
||||
/// instead of static str.
|
||||
@@ -64,21 +64,11 @@ impl MetricsKey {
|
||||
struct AbsoluteValueFactory(MetricsKey);
|
||||
|
||||
impl AbsoluteValueFactory {
|
||||
#[cfg(test)]
|
||||
const fn at_old_format(self, time: DateTime<Utc>, val: u64) -> super::RawMetric {
|
||||
const fn at(self, time: DateTime<Utc>, val: u64) -> RawMetric {
|
||||
let key = self.0;
|
||||
(key, (EventType::Absolute { time }, val))
|
||||
}
|
||||
|
||||
const fn at(self, time: DateTime<Utc>, val: u64) -> NewRawMetric {
|
||||
let key = self.0;
|
||||
NewRawMetric {
|
||||
key,
|
||||
kind: EventType::Absolute { time },
|
||||
value: val,
|
||||
}
|
||||
}
|
||||
|
||||
fn key(&self) -> &MetricsKey {
|
||||
&self.0
|
||||
}
|
||||
@@ -94,28 +84,7 @@ impl IncrementalValueFactory {
|
||||
prev_end: DateTime<Utc>,
|
||||
up_to: DateTime<Utc>,
|
||||
val: u64,
|
||||
) -> NewRawMetric {
|
||||
let key = self.0;
|
||||
// cannot assert prev_end < up_to because these are realtime clock based
|
||||
let when = EventType::Incremental {
|
||||
start_time: prev_end,
|
||||
stop_time: up_to,
|
||||
};
|
||||
NewRawMetric {
|
||||
key,
|
||||
kind: when,
|
||||
value: val,
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::wrong_self_convention)]
|
||||
#[cfg(test)]
|
||||
const fn from_until_old_format(
|
||||
self,
|
||||
prev_end: DateTime<Utc>,
|
||||
up_to: DateTime<Utc>,
|
||||
val: u64,
|
||||
) -> super::RawMetric {
|
||||
) -> RawMetric {
|
||||
let key = self.0;
|
||||
// cannot assert prev_end < up_to because these are realtime clock based
|
||||
let when = EventType::Incremental {
|
||||
@@ -216,7 +185,7 @@ pub(super) async fn collect_all_metrics(
|
||||
tenant_manager: &Arc<TenantManager>,
|
||||
cached_metrics: &Cache,
|
||||
ctx: &RequestContext,
|
||||
) -> Vec<NewRawMetric> {
|
||||
) -> Vec<RawMetric> {
|
||||
use pageserver_api::models::TenantState;
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
@@ -251,11 +220,11 @@ pub(super) async fn collect_all_metrics(
|
||||
res
|
||||
}
|
||||
|
||||
async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<NewRawMetric>
|
||||
async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<RawMetric>
|
||||
where
|
||||
S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
|
||||
{
|
||||
let mut current_metrics: Vec<NewRawMetric> = Vec::new();
|
||||
let mut current_metrics: Vec<RawMetric> = Vec::new();
|
||||
|
||||
let mut tenants = std::pin::pin!(tenants);
|
||||
|
||||
@@ -322,7 +291,7 @@ impl TenantSnapshot {
|
||||
tenant_id: TenantId,
|
||||
now: DateTime<Utc>,
|
||||
cached: &Cache,
|
||||
metrics: &mut Vec<NewRawMetric>,
|
||||
metrics: &mut Vec<RawMetric>,
|
||||
) {
|
||||
let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
|
||||
|
||||
@@ -333,9 +302,9 @@ impl TenantSnapshot {
|
||||
let mut synthetic_size = self.synthetic_size;
|
||||
|
||||
if synthetic_size == 0 {
|
||||
if let Some(item) = cached.get(factory.key()) {
|
||||
// use the latest value from previous session, TODO: check generation number
|
||||
synthetic_size = item.value;
|
||||
if let Some((_, value)) = cached.get(factory.key()) {
|
||||
// use the latest value from previous session
|
||||
synthetic_size = *value;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -412,36 +381,37 @@ impl TimelineSnapshot {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
now: DateTime<Utc>,
|
||||
metrics: &mut Vec<NewRawMetric>,
|
||||
metrics: &mut Vec<RawMetric>,
|
||||
cache: &Cache,
|
||||
) {
|
||||
let timeline_written_size = u64::from(self.last_record_lsn);
|
||||
|
||||
let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
|
||||
|
||||
let last_stop_time = cache.get(written_size_delta_key.key()).map(|item| {
|
||||
item.kind
|
||||
.incremental_timerange()
|
||||
.expect("never create EventType::Absolute for written_size_delta")
|
||||
.end
|
||||
});
|
||||
let last_stop_time = cache
|
||||
.get(written_size_delta_key.key())
|
||||
.map(|(until, _val)| {
|
||||
until
|
||||
.incremental_timerange()
|
||||
.expect("never create EventType::Absolute for written_size_delta")
|
||||
.end
|
||||
});
|
||||
|
||||
let written_size_now =
|
||||
let (key, written_size_now) =
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
|
||||
|
||||
// by default, use the last sent written_size as the basis for
|
||||
// calculating the delta. if we don't yet have one, use the load time value.
|
||||
let prev: (DateTime<Utc>, u64) = cache
|
||||
.get(&written_size_now.key)
|
||||
.map(|item| {
|
||||
let prev = cache
|
||||
.get(&key)
|
||||
.map(|(prev_at, prev)| {
|
||||
// use the prev time from our last incremental update, or default to latest
|
||||
// absolute update on the first round.
|
||||
let prev_at = item
|
||||
.kind
|
||||
let prev_at = prev_at
|
||||
.absolute_time()
|
||||
.expect("never create EventType::Incremental for written_size");
|
||||
let prev_at = last_stop_time.unwrap_or(prev_at);
|
||||
(*prev_at, item.value)
|
||||
(*prev_at, *prev)
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
// if we don't have a previous point of comparison, compare to the load time
|
||||
@@ -452,28 +422,24 @@ impl TimelineSnapshot {
|
||||
|
||||
let up_to = now;
|
||||
|
||||
if let Some(delta) = written_size_now.value.checked_sub(prev.1) {
|
||||
if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
|
||||
let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
|
||||
// written_size_delta
|
||||
metrics.push(key_value);
|
||||
// written_size
|
||||
metrics.push(written_size_now);
|
||||
metrics.push((key, written_size_now));
|
||||
} else {
|
||||
// the cached value was ahead of us, report zero until we've caught up
|
||||
metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
|
||||
// the cached value was ahead of us, report the same until we've caught up
|
||||
metrics.push(NewRawMetric {
|
||||
key: written_size_now.key,
|
||||
kind: written_size_now.kind,
|
||||
value: prev.1,
|
||||
});
|
||||
metrics.push((key, (written_size_now.0, prev.1)));
|
||||
}
|
||||
|
||||
{
|
||||
let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
|
||||
let current_or_previous = self
|
||||
.current_exact_logical_size
|
||||
.or_else(|| cache.get(factory.key()).map(|item| item.value));
|
||||
.or_else(|| cache.get(factory.key()).map(|(_, val)| *val));
|
||||
|
||||
if let Some(size) = current_or_previous {
|
||||
metrics.push(factory.at(now, size));
|
||||
@@ -486,4 +452,4 @@ impl TimelineSnapshot {
|
||||
mod tests;
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) use tests::{metric_examples, metric_examples_old};
|
||||
pub(crate) use tests::metric_examples;
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
use crate::consumption_metrics::RawMetric;
|
||||
|
||||
use super::*;
|
||||
use std::collections::HashMap;
|
||||
|
||||
@@ -52,9 +50,9 @@ fn startup_collected_timeline_metrics_second_round() {
|
||||
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
let cache = HashMap::from([MetricsKey::written_size(tenant_id, timeline_id)
|
||||
.at(before, disk_consistent_lsn.0)
|
||||
.to_kv_pair()]);
|
||||
let cache = HashMap::from([
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0)
|
||||
]);
|
||||
|
||||
let snap = TimelineSnapshot {
|
||||
loaded_at: (disk_consistent_lsn, init),
|
||||
@@ -91,13 +89,9 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
|
||||
let mut metrics = Vec::new();
|
||||
let cache = HashMap::from([
|
||||
// at t=before was the last time the last_record_lsn changed
|
||||
MetricsKey::written_size(tenant_id, timeline_id)
|
||||
.at(before, disk_consistent_lsn.0)
|
||||
.to_kv_pair(),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0),
|
||||
// end time of this event is used for the next ones
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id)
|
||||
.from_until(before, just_before, 0)
|
||||
.to_kv_pair(),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, just_before, 0),
|
||||
]);
|
||||
|
||||
let snap = TimelineSnapshot {
|
||||
@@ -144,17 +138,13 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
|
||||
};
|
||||
|
||||
let mut cache = HashMap::from([
|
||||
MetricsKey::written_size(tenant_id, timeline_id)
|
||||
.at(before_restart, 100)
|
||||
.to_kv_pair(),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id)
|
||||
.from_until(
|
||||
way_before,
|
||||
before_restart,
|
||||
// not taken into account, but the timestamps are important
|
||||
999_999_999,
|
||||
)
|
||||
.to_kv_pair(),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(before_restart, 100),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
|
||||
way_before,
|
||||
before_restart,
|
||||
// not taken into account, but the timestamps are important
|
||||
999_999_999,
|
||||
),
|
||||
]);
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
@@ -173,7 +163,7 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
|
||||
);
|
||||
|
||||
// now if we cache these metrics, and re-run while "still in recovery"
|
||||
cache.extend(metrics.drain(..).map(|x| x.to_kv_pair()));
|
||||
cache.extend(metrics.drain(..));
|
||||
|
||||
// "still in recovery", because our snapshot did not change
|
||||
snap.to_metrics(tenant_id, timeline_id, later, &mut metrics, &cache);
|
||||
@@ -204,14 +194,14 @@ fn post_restart_current_exact_logical_size_uses_cached() {
|
||||
current_exact_logical_size: None,
|
||||
};
|
||||
|
||||
let cache = HashMap::from([MetricsKey::timeline_logical_size(tenant_id, timeline_id)
|
||||
.at(before_restart, 100)
|
||||
.to_kv_pair()]);
|
||||
let cache = HashMap::from([
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(before_restart, 100)
|
||||
]);
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
|
||||
|
||||
metrics.retain(|item| item.key.metric == Name::LogicalSize);
|
||||
metrics.retain(|(key, _)| key.metric == Name::LogicalSize);
|
||||
|
||||
assert_eq!(
|
||||
metrics,
|
||||
@@ -234,9 +224,7 @@ fn post_restart_synthetic_size_uses_cached_if_available() {
|
||||
let before_restart = DateTime::<Utc>::from(now - std::time::Duration::from_secs(5 * 60));
|
||||
let now = DateTime::<Utc>::from(now);
|
||||
|
||||
let cached = HashMap::from([MetricsKey::synthetic_size(tenant_id)
|
||||
.at(before_restart, 1000)
|
||||
.to_kv_pair()]);
|
||||
let cached = HashMap::from([MetricsKey::synthetic_size(tenant_id).at(before_restart, 1000)]);
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
ts.to_metrics(tenant_id, now, &cached, &mut metrics);
|
||||
@@ -290,29 +278,12 @@ fn time_backwards<const N: usize>() -> [std::time::SystemTime; N] {
|
||||
times
|
||||
}
|
||||
|
||||
pub(crate) const fn metric_examples_old(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
now: DateTime<Utc>,
|
||||
before: DateTime<Utc>,
|
||||
) -> [RawMetric; 6] {
|
||||
[
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at_old_format(now, 0),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id)
|
||||
.from_until_old_format(before, now, 0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at_old_format(now, 0),
|
||||
MetricsKey::remote_storage_size(tenant_id).at_old_format(now, 0),
|
||||
MetricsKey::resident_size(tenant_id).at_old_format(now, 0),
|
||||
MetricsKey::synthetic_size(tenant_id).at_old_format(now, 1),
|
||||
]
|
||||
}
|
||||
|
||||
pub(crate) const fn metric_examples(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
now: DateTime<Utc>,
|
||||
before: DateTime<Utc>,
|
||||
) -> [NewRawMetric; 6] {
|
||||
) -> [RawMetric; 6] {
|
||||
[
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, 0),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0),
|
||||
|
||||
@@ -7,7 +7,7 @@ use tokio::io::AsyncWriteExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Instrument;
|
||||
|
||||
use super::{metrics::Name, Cache, MetricsKey, NewRawMetric, RawMetric};
|
||||
use super::{metrics::Name, Cache, MetricsKey, RawMetric};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
/// How the metrics from pageserver are identified.
|
||||
@@ -24,7 +24,7 @@ pub(super) async fn upload_metrics_http(
|
||||
client: &reqwest::Client,
|
||||
metric_collection_endpoint: &reqwest::Url,
|
||||
cancel: &CancellationToken,
|
||||
metrics: &[NewRawMetric],
|
||||
metrics: &[RawMetric],
|
||||
cached_metrics: &mut Cache,
|
||||
idempotency_keys: &[IdempotencyKey<'_>],
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -53,8 +53,8 @@ pub(super) async fn upload_metrics_http(
|
||||
|
||||
match res {
|
||||
Ok(()) => {
|
||||
for item in chunk {
|
||||
cached_metrics.insert(item.key, item.clone());
|
||||
for (curr_key, curr_val) in chunk {
|
||||
cached_metrics.insert(*curr_key, *curr_val);
|
||||
}
|
||||
uploaded += chunk.len();
|
||||
}
|
||||
@@ -86,7 +86,7 @@ pub(super) async fn upload_metrics_bucket(
|
||||
client: &GenericRemoteStorage,
|
||||
cancel: &CancellationToken,
|
||||
node_id: &str,
|
||||
metrics: &[NewRawMetric],
|
||||
metrics: &[RawMetric],
|
||||
idempotency_keys: &[IdempotencyKey<'_>],
|
||||
) -> anyhow::Result<()> {
|
||||
if metrics.is_empty() {
|
||||
@@ -140,16 +140,16 @@ pub(super) async fn upload_metrics_bucket(
|
||||
/// across different metrics sinks), and must have the same length as input.
|
||||
fn serialize_in_chunks<'a>(
|
||||
chunk_size: usize,
|
||||
input: &'a [NewRawMetric],
|
||||
input: &'a [RawMetric],
|
||||
idempotency_keys: &'a [IdempotencyKey<'a>],
|
||||
) -> impl ExactSizeIterator<Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>> + 'a
|
||||
) -> impl ExactSizeIterator<Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>> + 'a
|
||||
{
|
||||
use bytes::BufMut;
|
||||
|
||||
assert_eq!(input.len(), idempotency_keys.len());
|
||||
|
||||
struct Iter<'a> {
|
||||
inner: std::slice::Chunks<'a, NewRawMetric>,
|
||||
inner: std::slice::Chunks<'a, RawMetric>,
|
||||
idempotency_keys: std::slice::Iter<'a, IdempotencyKey<'a>>,
|
||||
chunk_size: usize,
|
||||
|
||||
@@ -160,7 +160,7 @@ fn serialize_in_chunks<'a>(
|
||||
}
|
||||
|
||||
impl<'a> Iterator for Iter<'a> {
|
||||
type Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>;
|
||||
type Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let chunk = self.inner.next()?;
|
||||
@@ -269,58 +269,6 @@ impl RawMetricExt for RawMetric {
|
||||
}
|
||||
}
|
||||
|
||||
impl RawMetricExt for NewRawMetric {
|
||||
fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
|
||||
let MetricsKey {
|
||||
metric,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
} = self.key;
|
||||
|
||||
let kind = self.kind;
|
||||
let value = self.value;
|
||||
|
||||
Event {
|
||||
kind,
|
||||
metric,
|
||||
idempotency_key: key.to_string(),
|
||||
value,
|
||||
extra: Ids {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
|
||||
use std::fmt::Write;
|
||||
|
||||
let MetricsKey {
|
||||
metric,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
} = self.key;
|
||||
|
||||
let kind = self.kind;
|
||||
let value = self.value;
|
||||
|
||||
*event = Event {
|
||||
kind,
|
||||
metric,
|
||||
idempotency_key: {
|
||||
event.idempotency_key.clear();
|
||||
write!(event.idempotency_key, "{key}").unwrap();
|
||||
std::mem::take(&mut event.idempotency_key)
|
||||
},
|
||||
value,
|
||||
extra: Ids {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait KeyGen<'a> {
|
||||
fn generate(&self) -> IdempotencyKey<'a>;
|
||||
}
|
||||
@@ -433,10 +381,6 @@ async fn upload(
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::consumption_metrics::{
|
||||
disk_cache::read_metrics_from_serde_value, NewMetricsRefRoot,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
use chrono::{DateTime, Utc};
|
||||
use once_cell::sync::Lazy;
|
||||
@@ -529,49 +473,23 @@ mod tests {
|
||||
let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(*SAMPLES_NOW, "1", 0);
|
||||
let examples = examples.into_iter().zip(metric_samples());
|
||||
|
||||
for ((line, expected), item) in examples {
|
||||
for ((line, expected), (key, (kind, value))) in examples {
|
||||
let e = consumption_metrics::Event {
|
||||
kind: item.kind,
|
||||
metric: item.key.metric,
|
||||
kind,
|
||||
metric: key.metric,
|
||||
idempotency_key: idempotency_key.to_string(),
|
||||
value: item.value,
|
||||
value,
|
||||
extra: Ids {
|
||||
tenant_id: item.key.tenant_id,
|
||||
timeline_id: item.key.timeline_id,
|
||||
tenant_id: key.tenant_id,
|
||||
timeline_id: key.timeline_id,
|
||||
},
|
||||
};
|
||||
let actual = serde_json::to_string(&e).unwrap();
|
||||
assert_eq!(
|
||||
expected, actual,
|
||||
"example for {:?} from line {line}",
|
||||
item.kind
|
||||
);
|
||||
assert_eq!(expected, actual, "example for {kind:?} from line {line}");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn disk_format_upgrade() {
|
||||
let old_samples_json = serde_json::to_value(metric_samples_old()).unwrap();
|
||||
let new_samples =
|
||||
serde_json::to_value(NewMetricsRefRoot::new(metric_samples().as_ref())).unwrap();
|
||||
let upgraded_samples = read_metrics_from_serde_value(old_samples_json).unwrap();
|
||||
let new_samples = read_metrics_from_serde_value(new_samples).unwrap();
|
||||
assert_eq!(upgraded_samples, new_samples);
|
||||
}
|
||||
|
||||
fn metric_samples_old() -> [RawMetric; 6] {
|
||||
let tenant_id = TenantId::from_array([0; 16]);
|
||||
let timeline_id = TimelineId::from_array([0xff; 16]);
|
||||
|
||||
let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
|
||||
.unwrap()
|
||||
.into();
|
||||
let [now, before] = [*SAMPLES_NOW, before];
|
||||
|
||||
super::super::metrics::metric_examples_old(tenant_id, timeline_id, now, before)
|
||||
}
|
||||
|
||||
fn metric_samples() -> [NewRawMetric; 6] {
|
||||
fn metric_samples() -> [RawMetric; 6] {
|
||||
let tenant_id = TenantId::from_array([0; 16]);
|
||||
let timeline_id = TimelineId::from_array([0xff; 16]);
|
||||
|
||||
|
||||
@@ -618,11 +618,13 @@ impl DeletionQueue {
|
||||
/// Caller may use the returned object to construct clients with new_client.
|
||||
/// Caller should tokio::spawn the background() members of the two worker objects returned:
|
||||
/// we don't spawn those inside new() so that the caller can use their runtime/spans of choice.
|
||||
///
|
||||
/// If remote_storage is None, then the returned workers will also be None.
|
||||
pub fn new<C>(
|
||||
remote_storage: GenericRemoteStorage,
|
||||
controller_upcall_client: Option<C>,
|
||||
conf: &'static PageServerConf,
|
||||
) -> (Self, DeletionQueueWorkers<C>)
|
||||
) -> (Self, Option<DeletionQueueWorkers<C>>)
|
||||
where
|
||||
C: ControlPlaneGenerationsApi + Send + Sync,
|
||||
{
|
||||
@@ -654,7 +656,7 @@ impl DeletionQueue {
|
||||
},
|
||||
cancel: cancel.clone(),
|
||||
},
|
||||
DeletionQueueWorkers {
|
||||
Some(DeletionQueueWorkers {
|
||||
frontend: ListWriter::new(conf, rx, backend_tx, cancel.clone()),
|
||||
backend: Validator::new(
|
||||
conf,
|
||||
@@ -665,7 +667,7 @@ impl DeletionQueue {
|
||||
cancel.clone(),
|
||||
),
|
||||
executor: Deleter::new(remote_storage, executor_rx, cancel.clone()),
|
||||
},
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -740,7 +742,9 @@ mod test {
|
||||
);
|
||||
|
||||
tracing::debug!("Spawning worker for new queue queue");
|
||||
let worker_join = workers.spawn_with(&tokio::runtime::Handle::current());
|
||||
let worker_join = workers
|
||||
.unwrap()
|
||||
.spawn_with(&tokio::runtime::Handle::current());
|
||||
|
||||
let old_worker_join = std::mem::replace(&mut self.worker_join, worker_join);
|
||||
let old_deletion_queue = std::mem::replace(&mut self.deletion_queue, deletion_queue);
|
||||
@@ -851,6 +855,7 @@ mod test {
|
||||
harness.conf,
|
||||
);
|
||||
|
||||
let worker = worker.unwrap();
|
||||
let worker_join = worker.spawn_with(&tokio::runtime::Handle::current());
|
||||
|
||||
Ok(TestSetup {
|
||||
|
||||
@@ -80,7 +80,6 @@ use crate::tenant::size::ModelInputs;
|
||||
use crate::tenant::storage_layer::LayerAccessStatsReset;
|
||||
use crate::tenant::storage_layer::LayerName;
|
||||
use crate::tenant::timeline::offload::offload_timeline;
|
||||
use crate::tenant::timeline::offload::OffloadError;
|
||||
use crate::tenant::timeline::CompactFlags;
|
||||
use crate::tenant::timeline::CompactionError;
|
||||
use crate::tenant::timeline::Timeline;
|
||||
@@ -2005,12 +2004,7 @@ async fn timeline_offload_handler(
|
||||
}
|
||||
offload_timeline(&tenant, &timeline)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
match e {
|
||||
OffloadError::Cancelled => ApiError::ResourceUnavailable("Timeline shutting down".into()),
|
||||
_ => ApiError::InternalServerError(anyhow!(e))
|
||||
}
|
||||
})?;
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
@@ -2066,7 +2060,6 @@ async fn timeline_checkpoint_handler(
|
||||
.map_err(|e|
|
||||
match e {
|
||||
CompactionError::ShuttingDown => ApiError::ShuttingDown,
|
||||
CompactionError::Offload(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
|
||||
CompactionError::Other(e) => ApiError::InternalServerError(e)
|
||||
}
|
||||
)?;
|
||||
|
||||
@@ -12,7 +12,6 @@ use pageserver_api::key::rel_block_to_key;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt};
|
||||
use tokio_tar::Archive;
|
||||
use tracing::*;
|
||||
use wal_decoder::models::InterpretedWalRecord;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
@@ -24,6 +23,7 @@ use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::relfile_utils::*;
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use postgres_ffi::walrecord::{decode_wal_record, DecodedWALRecord};
|
||||
use postgres_ffi::ControlFileData;
|
||||
use postgres_ffi::DBState_DB_SHUTDOWNED;
|
||||
use postgres_ffi::Oid;
|
||||
@@ -312,15 +312,11 @@ async fn import_wal(
|
||||
let mut modification = tline.begin_modification(last_lsn);
|
||||
while last_lsn <= endpoint {
|
||||
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
let interpreted = InterpretedWalRecord::from_bytes_filtered(
|
||||
recdata,
|
||||
tline.get_shard_identity(),
|
||||
lsn,
|
||||
tline.pg_version,
|
||||
)?;
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
decode_wal_record(recdata, &mut decoded, tline.pg_version)?;
|
||||
|
||||
walingest
|
||||
.ingest_record(interpreted, &mut modification, ctx)
|
||||
.ingest_record(decoded, lsn, &mut modification, ctx)
|
||||
.await?;
|
||||
WAL_INGEST.records_committed.inc();
|
||||
|
||||
@@ -457,15 +453,10 @@ pub async fn import_wal_from_tar(
|
||||
let mut modification = tline.begin_modification(last_lsn);
|
||||
while last_lsn <= end_lsn {
|
||||
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
let interpreted = InterpretedWalRecord::from_bytes_filtered(
|
||||
recdata,
|
||||
tline.get_shard_identity(),
|
||||
lsn,
|
||||
tline.pg_version,
|
||||
)?;
|
||||
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
decode_wal_record(recdata, &mut decoded, tline.pg_version)?;
|
||||
walingest
|
||||
.ingest_record(interpreted, &mut modification, ctx)
|
||||
.ingest_record(decoded, lsn, &mut modification, ctx)
|
||||
.await?;
|
||||
modification.commit(ctx).await?;
|
||||
last_lsn = lsn;
|
||||
|
||||
@@ -521,6 +521,13 @@ pub struct OffloadedTimeline {
|
||||
/// Present for future flattening deliberations.
|
||||
pub archived_at: NaiveDateTime,
|
||||
|
||||
/// Lazily constructed remote client for the timeline
|
||||
///
|
||||
/// If we offload a timeline, we keep around the remote client
|
||||
/// for the duration of the process. If we find it through the
|
||||
/// manifest, we don't construct it up until it's needed (deletion).
|
||||
pub remote_client: Option<Arc<RemoteTimelineClient>>,
|
||||
|
||||
/// Prevent two tasks from deleting the timeline at the same time. If held, the
|
||||
/// timeline is being deleted. If 'true', the timeline has already been deleted.
|
||||
pub delete_progress: TimelineDeleteProgress,
|
||||
@@ -547,6 +554,7 @@ impl OffloadedTimeline {
|
||||
ancestor_retain_lsn,
|
||||
archived_at,
|
||||
|
||||
remote_client: Some(timeline.remote_client.clone()),
|
||||
delete_progress: timeline.delete_progress.clone(),
|
||||
})
|
||||
}
|
||||
@@ -563,6 +571,7 @@ impl OffloadedTimeline {
|
||||
ancestor_timeline_id,
|
||||
ancestor_retain_lsn,
|
||||
archived_at,
|
||||
remote_client: None,
|
||||
delete_progress: TimelineDeleteProgress::default(),
|
||||
}
|
||||
}
|
||||
@@ -627,7 +636,7 @@ impl TimelineOrOffloaded {
|
||||
fn maybe_remote_client(&self) -> Option<Arc<RemoteTimelineClient>> {
|
||||
match self {
|
||||
TimelineOrOffloaded::Timeline(timeline) => Some(timeline.remote_client.clone()),
|
||||
TimelineOrOffloaded::Offloaded(_offloaded) => None,
|
||||
TimelineOrOffloaded::Offloaded(offloaded) => offloaded.remote_client.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2529,11 +2538,6 @@ impl Tenant {
|
||||
.await
|
||||
.inspect_err(|e| match e {
|
||||
timeline::CompactionError::ShuttingDown => (),
|
||||
timeline::CompactionError::Offload(_) => {
|
||||
// Failures to offload timelines do not trip the circuit breaker, because
|
||||
// they do not do lots of writes the way compaction itself does: it is cheap
|
||||
// to retry, and it would be bad to stop all compaction because of an issue with offloading.
|
||||
}
|
||||
timeline::CompactionError::Other(e) => {
|
||||
self.compaction_circuit_breaker
|
||||
.lock()
|
||||
@@ -2549,7 +2553,8 @@ impl Tenant {
|
||||
if pending_task_left == Some(false) && *can_offload {
|
||||
offload_timeline(self, timeline)
|
||||
.instrument(info_span!("offload_timeline", %timeline_id))
|
||||
.await?;
|
||||
.await
|
||||
.map_err(timeline::CompactionError::Other)?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -279,7 +279,6 @@ fn log_compaction_error(
|
||||
|
||||
let decision = match e {
|
||||
ShuttingDown => None,
|
||||
Offload(_) => Some(LooksLike::Error),
|
||||
_ if task_cancelled => Some(LooksLike::Info),
|
||||
Other(e) => {
|
||||
let root_cause = e.root_cause();
|
||||
|
||||
@@ -20,7 +20,6 @@ use chrono::{DateTime, Utc};
|
||||
use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use handle::ShardTimelineId;
|
||||
use offload::OffloadError;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::{
|
||||
key::{
|
||||
@@ -4476,23 +4475,11 @@ impl Drop for Timeline {
|
||||
pub(crate) enum CompactionError {
|
||||
#[error("The timeline or pageserver is shutting down")]
|
||||
ShuttingDown,
|
||||
/// Compaction tried to offload a timeline and failed
|
||||
#[error("Failed to offload timeline: {0}")]
|
||||
Offload(OffloadError),
|
||||
/// Compaction cannot be done right now; page reconstruction and so on.
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl From<OffloadError> for CompactionError {
|
||||
fn from(e: OffloadError) -> Self {
|
||||
match e {
|
||||
OffloadError::Cancelled => Self::ShuttingDown,
|
||||
_ => Self::Offload(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CompactionError {
|
||||
pub fn is_cancelled(&self) -> bool {
|
||||
matches!(self, CompactionError::ShuttingDown)
|
||||
|
||||
@@ -18,7 +18,6 @@ use crate::{
|
||||
CreateTimelineCause, DeleteTimelineError, MaybeDeletedIndexPart, Tenant,
|
||||
TimelineOrOffloaded,
|
||||
},
|
||||
virtual_file::MaybeFatalIo,
|
||||
};
|
||||
|
||||
use super::{Timeline, TimelineResources};
|
||||
@@ -63,10 +62,10 @@ pub(super) async fn delete_local_timeline_directory(
|
||||
conf: &PageServerConf,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline: &Timeline,
|
||||
) {
|
||||
) -> anyhow::Result<()> {
|
||||
// Always ensure the lock order is compaction -> gc.
|
||||
let compaction_lock = timeline.compaction_lock.lock();
|
||||
let _compaction_lock = crate::timed(
|
||||
let compaction_lock = crate::timed(
|
||||
compaction_lock,
|
||||
"acquires compaction lock",
|
||||
std::time::Duration::from_secs(5),
|
||||
@@ -74,7 +73,7 @@ pub(super) async fn delete_local_timeline_directory(
|
||||
.await;
|
||||
|
||||
let gc_lock = timeline.gc_lock.lock();
|
||||
let _gc_lock = crate::timed(
|
||||
let gc_lock = crate::timed(
|
||||
gc_lock,
|
||||
"acquires gc lock",
|
||||
std::time::Duration::from_secs(5),
|
||||
@@ -86,15 +85,24 @@ pub(super) async fn delete_local_timeline_directory(
|
||||
|
||||
let local_timeline_directory = conf.timeline_path(&tenant_shard_id, &timeline.timeline_id);
|
||||
|
||||
fail::fail_point!("timeline-delete-before-rm", |_| {
|
||||
Err(anyhow::anyhow!("failpoint: timeline-delete-before-rm"))?
|
||||
});
|
||||
|
||||
// NB: This need not be atomic because the deleted flag in the IndexPart
|
||||
// will be observed during tenant/timeline load. The deletion will be resumed there.
|
||||
//
|
||||
// ErrorKind::NotFound can happen e.g. if we race with tenant detach, because,
|
||||
// Note that here we do not bail out on std::io::ErrorKind::NotFound.
|
||||
// This can happen if we're called a second time, e.g.,
|
||||
// because of a previous failure/cancellation at/after
|
||||
// failpoint timeline-delete-after-rm.
|
||||
//
|
||||
// ErrorKind::NotFound can also happen if we race with tenant detach, because,
|
||||
// no locks are shared.
|
||||
tokio::fs::remove_dir_all(local_timeline_directory)
|
||||
.await
|
||||
.or_else(fs_ext::ignore_not_found)
|
||||
.fatal_err("removing timeline directory");
|
||||
.context("remove local timeline directory")?;
|
||||
|
||||
// Make sure previous deletions are ordered before mark removal.
|
||||
// Otherwise there is no guarantee that they reach the disk before mark deletion.
|
||||
@@ -105,9 +113,17 @@ pub(super) async fn delete_local_timeline_directory(
|
||||
let timeline_path = conf.timelines_path(&tenant_shard_id);
|
||||
crashsafe::fsync_async(timeline_path)
|
||||
.await
|
||||
.fatal_err("fsync after removing timeline directory");
|
||||
.context("fsync_pre_mark_remove")?;
|
||||
|
||||
info!("finished deleting layer files, releasing locks");
|
||||
drop(gc_lock);
|
||||
drop(compaction_lock);
|
||||
|
||||
fail::fail_point!("timeline-delete-after-rm", |_| {
|
||||
Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))?
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Removes remote layers and an index file after them.
|
||||
@@ -198,8 +214,7 @@ impl DeleteTimelineFlow {
|
||||
) -> Result<(), DeleteTimelineError> {
|
||||
super::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let allow_offloaded_children = false;
|
||||
let (timeline, mut guard) = Self::prepare(tenant, timeline_id, allow_offloaded_children)?;
|
||||
let (timeline, mut guard) = Self::prepare(tenant, timeline_id)?;
|
||||
|
||||
guard.mark_in_progress()?;
|
||||
|
||||
@@ -325,7 +340,6 @@ impl DeleteTimelineFlow {
|
||||
pub(super) fn prepare(
|
||||
tenant: &Tenant,
|
||||
timeline_id: TimelineId,
|
||||
allow_offloaded_children: bool,
|
||||
) -> Result<(TimelineOrOffloaded, DeletionGuard), DeleteTimelineError> {
|
||||
// Note the interaction between this guard and deletion guard.
|
||||
// Here we attempt to lock deletion guard when we're holding a lock on timelines.
|
||||
@@ -338,27 +352,30 @@ impl DeleteTimelineFlow {
|
||||
// T1: acquire deletion lock, do another `DeleteTimelineFlow::run`
|
||||
// For more context see this discussion: `https://github.com/neondatabase/neon/pull/4552#discussion_r1253437346`
|
||||
let timelines = tenant.timelines.lock().unwrap();
|
||||
let timelines_offloaded = tenant.timelines_offloaded.lock().unwrap();
|
||||
|
||||
let timeline = match timelines.get(&timeline_id) {
|
||||
Some(t) => TimelineOrOffloaded::Timeline(Arc::clone(t)),
|
||||
None => match timelines_offloaded.get(&timeline_id) {
|
||||
Some(t) => TimelineOrOffloaded::Offloaded(Arc::clone(t)),
|
||||
None => return Err(DeleteTimelineError::NotFound),
|
||||
},
|
||||
None => {
|
||||
let offloaded_timelines = tenant.timelines_offloaded.lock().unwrap();
|
||||
match offloaded_timelines.get(&timeline_id) {
|
||||
Some(t) => TimelineOrOffloaded::Offloaded(Arc::clone(t)),
|
||||
None => return Err(DeleteTimelineError::NotFound),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Ensure that there are no child timelines, because we are about to remove files,
|
||||
// which will break child branches
|
||||
let mut children = Vec::new();
|
||||
if !allow_offloaded_children {
|
||||
children.extend(timelines_offloaded.iter().filter_map(|(id, entry)| {
|
||||
(entry.ancestor_timeline_id == Some(timeline_id)).then_some(*id)
|
||||
}));
|
||||
}
|
||||
children.extend(timelines.iter().filter_map(|(id, entry)| {
|
||||
(entry.get_ancestor_timeline_id() == Some(timeline_id)).then_some(*id)
|
||||
}));
|
||||
// Ensure that there are no child timelines **attached to that pageserver**,
|
||||
// because detach removes files, which will break child branches
|
||||
let children: Vec<TimelineId> = timelines
|
||||
.iter()
|
||||
.filter_map(|(id, entry)| {
|
||||
if entry.get_ancestor_timeline_id() == Some(timeline_id) {
|
||||
Some(*id)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
if !children.is_empty() {
|
||||
return Err(DeleteTimelineError::HasChildren(children));
|
||||
@@ -424,20 +441,12 @@ impl DeleteTimelineFlow {
|
||||
timeline: &TimelineOrOffloaded,
|
||||
remote_client: Arc<RemoteTimelineClient>,
|
||||
) -> Result<(), DeleteTimelineError> {
|
||||
fail::fail_point!("timeline-delete-before-rm", |_| {
|
||||
Err(anyhow::anyhow!("failpoint: timeline-delete-before-rm"))?
|
||||
});
|
||||
|
||||
// Offloaded timelines have no local state
|
||||
// TODO: once we persist offloaded information, delete the timeline from there, too
|
||||
if let TimelineOrOffloaded::Timeline(timeline) = timeline {
|
||||
delete_local_timeline_directory(conf, tenant.tenant_shard_id, timeline).await;
|
||||
delete_local_timeline_directory(conf, tenant.tenant_shard_id, timeline).await?;
|
||||
}
|
||||
|
||||
fail::fail_point!("timeline-delete-after-rm", |_| {
|
||||
Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))?
|
||||
});
|
||||
|
||||
delete_remote_layers_and_index(&remote_client).await?;
|
||||
|
||||
pausable_failpoint!("in_progress_delete");
|
||||
|
||||
@@ -3,40 +3,16 @@ use std::sync::Arc;
|
||||
use super::delete::{delete_local_timeline_directory, DeleteTimelineFlow, DeletionGuard};
|
||||
use super::Timeline;
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::{OffloadedTimeline, Tenant, TenantManifestError, TimelineOrOffloaded};
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum OffloadError {
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
#[error("Timeline is not archived")]
|
||||
NotArchived,
|
||||
#[error(transparent)]
|
||||
RemoteStorage(anyhow::Error),
|
||||
#[error("Unexpected offload error: {0}")]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl From<TenantManifestError> for OffloadError {
|
||||
fn from(e: TenantManifestError) -> Self {
|
||||
match e {
|
||||
TenantManifestError::Cancelled => Self::Cancelled,
|
||||
TenantManifestError::RemoteStorage(e) => Self::RemoteStorage(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
use crate::tenant::{OffloadedTimeline, Tenant, TimelineOrOffloaded};
|
||||
|
||||
pub(crate) async fn offload_timeline(
|
||||
tenant: &Tenant,
|
||||
timeline: &Arc<Timeline>,
|
||||
) -> Result<(), OffloadError> {
|
||||
) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
tracing::info!("offloading archived timeline");
|
||||
|
||||
let allow_offloaded_children = true;
|
||||
let (timeline, guard) =
|
||||
DeleteTimelineFlow::prepare(tenant, timeline.timeline_id, allow_offloaded_children)
|
||||
.map_err(|e| OffloadError::Other(anyhow::anyhow!(e)))?;
|
||||
let (timeline, guard) = DeleteTimelineFlow::prepare(tenant, timeline.timeline_id)?;
|
||||
|
||||
let TimelineOrOffloaded::Timeline(timeline) = timeline else {
|
||||
tracing::error!("timeline already offloaded, but given timeline object");
|
||||
@@ -48,15 +24,14 @@ pub(crate) async fn offload_timeline(
|
||||
Some(true) => (),
|
||||
Some(false) => {
|
||||
tracing::warn!(?is_archived, "tried offloading a non-archived timeline");
|
||||
return Err(OffloadError::NotArchived);
|
||||
anyhow::bail!("timeline isn't archived");
|
||||
}
|
||||
None => {
|
||||
// This is legal: calls to this function can race with the timeline shutting down
|
||||
tracing::info!(
|
||||
tracing::warn!(
|
||||
?is_archived,
|
||||
"tried offloading a timeline whose remote storage is not initialized"
|
||||
"tried offloading a timeline where manifest is not yet available"
|
||||
);
|
||||
return Err(OffloadError::Cancelled);
|
||||
anyhow::bail!("timeline manifest hasn't been loaded yet");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -67,9 +42,9 @@ pub(crate) async fn offload_timeline(
|
||||
// to make deletions possible while offloading is in progress
|
||||
|
||||
let conf = &tenant.conf;
|
||||
delete_local_timeline_directory(conf, tenant.tenant_shard_id, &timeline).await;
|
||||
delete_local_timeline_directory(conf, tenant.tenant_shard_id, &timeline).await?;
|
||||
|
||||
remove_timeline_from_tenant(tenant, &timeline, &guard);
|
||||
remove_timeline_from_tenant(tenant, &timeline, &guard).await?;
|
||||
|
||||
{
|
||||
let mut offloaded_timelines = tenant.timelines_offloaded.lock().unwrap();
|
||||
@@ -88,18 +63,21 @@ pub(crate) async fn offload_timeline(
|
||||
// at the next restart attach it again.
|
||||
// For that to happen, we'd need to make the manifest reflect our *intended* state,
|
||||
// not our actual state of offloaded timelines.
|
||||
tenant.store_tenant_manifest().await?;
|
||||
tenant
|
||||
.store_tenant_manifest()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// It is important that this gets called when DeletionGuard is being held.
|
||||
/// For more context see comments in [`DeleteTimelineFlow::prepare`]
|
||||
fn remove_timeline_from_tenant(
|
||||
async fn remove_timeline_from_tenant(
|
||||
tenant: &Tenant,
|
||||
timeline: &Timeline,
|
||||
_: &DeletionGuard, // using it as a witness
|
||||
) {
|
||||
) -> anyhow::Result<()> {
|
||||
// Remove the timeline from the map.
|
||||
let mut timelines = tenant.timelines.lock().unwrap();
|
||||
let children_exist = timelines
|
||||
@@ -115,4 +93,8 @@ fn remove_timeline_from_tenant(
|
||||
timelines
|
||||
.remove(&timeline.timeline_id)
|
||||
.expect("timeline that we were deleting was concurrently removed from 'timelines' map");
|
||||
|
||||
drop(timelines);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -141,9 +141,7 @@ impl Drop for UninitializedTimeline<'_> {
|
||||
fn drop(&mut self) {
|
||||
if let Some((_, create_guard)) = self.raw_timeline.take() {
|
||||
let _entered = info_span!("drop_uninitialized_timeline", tenant_id = %self.owning_tenant.tenant_shard_id.tenant_id, shard_id = %self.owning_tenant.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id).entered();
|
||||
// This is unusual, but can happen harmlessly if the pageserver is stopped while
|
||||
// creating a timeline.
|
||||
info!("Timeline got dropped without initializing, cleaning its files");
|
||||
error!("Timeline got dropped without initializing, cleaning its files");
|
||||
cleanup_timeline_directory(create_guard);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +22,6 @@ use tokio::{select, sync::watch, time};
|
||||
use tokio_postgres::{replication::ReplicationStream, Client};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, trace, warn, Instrument};
|
||||
use wal_decoder::models::{FlushUncommittedRecords, InterpretedWalRecord};
|
||||
|
||||
use super::TaskStateUpdate;
|
||||
use crate::{
|
||||
@@ -36,6 +35,7 @@ use crate::{
|
||||
use postgres_backend::is_expected_io_error;
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use postgres_ffi::walrecord::{decode_wal_record, DecodedWALRecord};
|
||||
use utils::{id::NodeId, lsn::Lsn};
|
||||
use utils::{pageserver_feedback::PageserverFeedback, sync::gate::GateError};
|
||||
|
||||
@@ -339,15 +339,11 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
return Err(WalReceiverError::Other(anyhow!("LSN not aligned")));
|
||||
}
|
||||
|
||||
// Deserialize and interpret WAL record
|
||||
let interpreted = InterpretedWalRecord::from_bytes_filtered(
|
||||
recdata,
|
||||
modification.tline.get_shard_identity(),
|
||||
lsn,
|
||||
modification.tline.pg_version,
|
||||
)?;
|
||||
// Deserialize WAL record
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
decode_wal_record(recdata, &mut decoded, modification.tline.pg_version)?;
|
||||
|
||||
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
|
||||
if decoded.is_dbase_create_copy(timeline.pg_version)
|
||||
&& uncommitted_records > 0
|
||||
{
|
||||
// Special case: legacy PG database creations operate by reading pages from a 'template' database:
|
||||
@@ -364,7 +360,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
|
||||
// Ingest the records without immediately committing them.
|
||||
let ingested = walingest
|
||||
.ingest_record(interpreted, &mut modification, &ctx)
|
||||
.ingest_record(decoded, lsn, &mut modification, &ctx)
|
||||
.await
|
||||
.with_context(|| format!("could not ingest record at {lsn}"))?;
|
||||
if !ingested {
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -67,10 +67,7 @@ pub(crate) fn apply_in_neon(
|
||||
let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
|
||||
|
||||
map[map_byte as usize] &= !(flags << map_offset);
|
||||
// The page should never be empty, but we're checking it anyway as a precaution, so that if it is empty for some reason anyway, we don't make matters worse by setting the LSN on it.
|
||||
if !postgres_ffi::page_is_new(page) {
|
||||
postgres_ffi::page_set_lsn(page, lsn);
|
||||
}
|
||||
postgres_ffi::page_set_lsn(page, lsn);
|
||||
}
|
||||
|
||||
// Repeat for 'old_heap_blkno', if any
|
||||
@@ -84,10 +81,7 @@ pub(crate) fn apply_in_neon(
|
||||
let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
|
||||
|
||||
map[map_byte as usize] &= !(flags << map_offset);
|
||||
// The page should never be empty, but we're checking it anyway as a precaution, so that if it is empty for some reason anyway, we don't make matters worse by setting the LSN on it.
|
||||
if !postgres_ffi::page_is_new(page) {
|
||||
postgres_ffi::page_set_lsn(page, lsn);
|
||||
}
|
||||
postgres_ffi::page_set_lsn(page, lsn);
|
||||
}
|
||||
}
|
||||
// Non-relational WAL records are handled here, with custom code that has the
|
||||
|
||||
@@ -74,8 +74,6 @@ sha2 = { workspace = true, features = ["asm", "oid"] }
|
||||
smol_str.workspace = true
|
||||
smallvec.workspace = true
|
||||
socket2.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
subtle.workspace = true
|
||||
thiserror.workspace = true
|
||||
tikv-jemallocator.workspace = true
|
||||
|
||||
@@ -13,10 +13,9 @@ use itertools::Itertools;
|
||||
use proxy::config::TlsServerEndPoint;
|
||||
use proxy::context::RequestMonitoring;
|
||||
use proxy::metrics::{Metrics, ThreadPoolMetrics};
|
||||
use proxy::protocol2::ConnectionInfo;
|
||||
use proxy::proxy::{copy_bidirectional_client_compute, run_until_cancelled, ErrorSource};
|
||||
use proxy::stream::{PqStream, Stream};
|
||||
use rustls::crypto::ring;
|
||||
use rustls::crypto::aws_lc_rs;
|
||||
use rustls::pki_types::PrivateKeyDer;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::net::TcpListener;
|
||||
@@ -106,13 +105,14 @@ async fn main() -> anyhow::Result<()> {
|
||||
let first_cert = cert_chain.first().context("missing certificate")?;
|
||||
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
|
||||
|
||||
let tls_config =
|
||||
rustls::ServerConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])
|
||||
.context("ring should support TLS1.2 and TLS1.3")?
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(cert_chain, key)?
|
||||
.into();
|
||||
let tls_config = rustls::ServerConfig::builder_with_provider(Arc::new(
|
||||
aws_lc_rs::default_provider(),
|
||||
))
|
||||
.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])
|
||||
.context("aws_lc_rs should support TLS1.2 and TLS1.3")?
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(cert_chain, key)?
|
||||
.into();
|
||||
|
||||
(tls_config, tls_server_end_point)
|
||||
}
|
||||
@@ -179,10 +179,7 @@ async fn task_main(
|
||||
info!(%peer_addr, "serving");
|
||||
let ctx = RequestMonitoring::new(
|
||||
session_id,
|
||||
ConnectionInfo {
|
||||
addr: peer_addr,
|
||||
extra: None,
|
||||
},
|
||||
peer_addr.ip(),
|
||||
proxy::metrics::Protocol::SniRouter,
|
||||
"sni",
|
||||
);
|
||||
|
||||
@@ -8,7 +8,7 @@ use itertools::Itertools;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pq_proto::StartupMessageParams;
|
||||
use rustls::client::danger::ServerCertVerifier;
|
||||
use rustls::crypto::ring;
|
||||
use rustls::crypto::aws_lc_rs;
|
||||
use rustls::pki_types::InvalidDnsNameError;
|
||||
use thiserror::Error;
|
||||
use tokio::net::TcpStream;
|
||||
@@ -266,12 +266,12 @@ impl ConnCfg {
|
||||
}
|
||||
}
|
||||
|
||||
type RustlsStream = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::Stream;
|
||||
|
||||
pub(crate) struct PostgresConnection {
|
||||
/// Socket connected to a compute node.
|
||||
pub(crate) stream:
|
||||
tokio_postgres::maybe_tls_stream::MaybeTlsStream<tokio::net::TcpStream, RustlsStream>,
|
||||
pub(crate) stream: tokio_postgres::maybe_tls_stream::MaybeTlsStream<
|
||||
tokio::net::TcpStream,
|
||||
tokio_postgres_rustls::RustlsStream<tokio::net::TcpStream>,
|
||||
>,
|
||||
/// PostgreSQL connection parameters.
|
||||
pub(crate) params: std::collections::HashMap<String, String>,
|
||||
/// Query cancellation token.
|
||||
@@ -298,9 +298,9 @@ impl ConnCfg {
|
||||
let client_config = if allow_self_signed_compute {
|
||||
// Allow all certificates for creating the connection
|
||||
let verifier = Arc::new(AcceptEverythingVerifier);
|
||||
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
rustls::ClientConfig::builder_with_provider(Arc::new(aws_lc_rs::default_provider()))
|
||||
.with_safe_default_protocol_versions()
|
||||
.expect("ring should support the default protocol versions")
|
||||
.expect("aws_lc_rs should support the default protocol versions")
|
||||
.dangerous()
|
||||
.with_custom_certificate_verifier(verifier)
|
||||
} else {
|
||||
@@ -308,9 +308,9 @@ impl ConnCfg {
|
||||
.get_or_try_init(load_certs)
|
||||
.map_err(ConnectionError::TlsCertificateError)?
|
||||
.clone();
|
||||
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
rustls::ClientConfig::builder_with_provider(Arc::new(aws_lc_rs::default_provider()))
|
||||
.with_safe_default_protocol_versions()
|
||||
.expect("ring should support the default protocol versions")
|
||||
.expect("aws_lc_rs should support the default protocol versions")
|
||||
.with_root_certificates(root_store)
|
||||
};
|
||||
let client_config = client_config.with_no_client_auth();
|
||||
|
||||
@@ -7,7 +7,7 @@ use anyhow::{bail, ensure, Context, Ok};
|
||||
use clap::ValueEnum;
|
||||
use itertools::Itertools;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use rustls::crypto::ring::{self, sign};
|
||||
use rustls::crypto::aws_lc_rs::{self, sign};
|
||||
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
|
||||
use sha2::{Digest, Sha256};
|
||||
use tracing::{error, info};
|
||||
@@ -127,9 +127,9 @@ pub fn configure_tls(
|
||||
|
||||
// allow TLS 1.2 to be compatible with older client libraries
|
||||
let mut config =
|
||||
rustls::ServerConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
rustls::ServerConfig::builder_with_provider(Arc::new(aws_lc_rs::default_provider()))
|
||||
.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])
|
||||
.context("ring should support TLS1.2 and TLS1.3")?
|
||||
.context("aws_lc_rs should support TLS1.2 and TLS1.3")?
|
||||
.with_no_client_auth()
|
||||
.with_cert_resolver(cert_resolver.clone());
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ use crate::config::{ProxyConfig, ProxyProtocolV2};
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::error::ReportableError;
|
||||
use crate::metrics::{Metrics, NumClientConnectionsGuard};
|
||||
use crate::protocol2::{read_proxy_protocol, ConnectionInfo};
|
||||
use crate::protocol2::read_proxy_protocol;
|
||||
use crate::proxy::connect_compute::{connect_to_compute, TcpMechanism};
|
||||
use crate::proxy::handshake::{handshake, HandshakeData};
|
||||
use crate::proxy::passthrough::ProxyPassthrough;
|
||||
@@ -65,8 +65,8 @@ pub async fn task_main(
|
||||
error!("proxy protocol header not supported");
|
||||
return;
|
||||
}
|
||||
Ok((socket, Some(info))) => (socket, info),
|
||||
Ok((socket, None)) => (socket, ConnectionInfo{ addr: peer_addr, extra: None }),
|
||||
Ok((socket, Some(addr))) => (socket, addr.ip()),
|
||||
Ok((socket, None)) => (socket, peer_addr.ip()),
|
||||
};
|
||||
|
||||
match socket.inner.set_nodelay(true) {
|
||||
|
||||
@@ -19,7 +19,6 @@ use crate::intern::{BranchIdInt, ProjectIdInt};
|
||||
use crate::metrics::{
|
||||
ConnectOutcome, InvalidEndpointsGroup, LatencyTimer, Metrics, Protocol, Waiting,
|
||||
};
|
||||
use crate::protocol2::ConnectionInfo;
|
||||
use crate::types::{DbName, EndpointId, RoleName};
|
||||
|
||||
pub mod parquet;
|
||||
@@ -41,7 +40,7 @@ pub struct RequestMonitoring(
|
||||
);
|
||||
|
||||
struct RequestMonitoringInner {
|
||||
pub(crate) conn_info: ConnectionInfo,
|
||||
pub(crate) peer_addr: IpAddr,
|
||||
pub(crate) session_id: Uuid,
|
||||
pub(crate) protocol: Protocol,
|
||||
first_packet: chrono::DateTime<Utc>,
|
||||
@@ -85,7 +84,7 @@ impl Clone for RequestMonitoring {
|
||||
fn clone(&self) -> Self {
|
||||
let inner = self.0.try_lock().expect("should not deadlock");
|
||||
let new = RequestMonitoringInner {
|
||||
conn_info: inner.conn_info.clone(),
|
||||
peer_addr: inner.peer_addr,
|
||||
session_id: inner.session_id,
|
||||
protocol: inner.protocol,
|
||||
first_packet: inner.first_packet,
|
||||
@@ -118,7 +117,7 @@ impl Clone for RequestMonitoring {
|
||||
impl RequestMonitoring {
|
||||
pub fn new(
|
||||
session_id: Uuid,
|
||||
conn_info: ConnectionInfo,
|
||||
peer_addr: IpAddr,
|
||||
protocol: Protocol,
|
||||
region: &'static str,
|
||||
) -> Self {
|
||||
@@ -126,13 +125,13 @@ impl RequestMonitoring {
|
||||
"connect_request",
|
||||
%protocol,
|
||||
?session_id,
|
||||
%conn_info,
|
||||
%peer_addr,
|
||||
ep = tracing::field::Empty,
|
||||
role = tracing::field::Empty,
|
||||
);
|
||||
|
||||
let inner = RequestMonitoringInner {
|
||||
conn_info,
|
||||
peer_addr,
|
||||
session_id,
|
||||
protocol,
|
||||
first_packet: Utc::now(),
|
||||
@@ -163,11 +162,7 @@ impl RequestMonitoring {
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn test() -> Self {
|
||||
use std::net::SocketAddr;
|
||||
let ip = IpAddr::from([127, 0, 0, 1]);
|
||||
let addr = SocketAddr::new(ip, 5432);
|
||||
let conn_info = ConnectionInfo { addr, extra: None };
|
||||
RequestMonitoring::new(Uuid::now_v7(), conn_info, Protocol::Tcp, "test")
|
||||
RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), Protocol::Tcp, "test")
|
||||
}
|
||||
|
||||
pub(crate) fn console_application_name(&self) -> String {
|
||||
@@ -291,12 +286,7 @@ impl RequestMonitoring {
|
||||
}
|
||||
|
||||
pub(crate) fn peer_addr(&self) -> IpAddr {
|
||||
self.0
|
||||
.try_lock()
|
||||
.expect("should not deadlock")
|
||||
.conn_info
|
||||
.addr
|
||||
.ip()
|
||||
self.0.try_lock().expect("should not deadlock").peer_addr
|
||||
}
|
||||
|
||||
pub(crate) fn cold_start_info(&self) -> ColdStartInfo {
|
||||
@@ -372,7 +362,7 @@ impl RequestMonitoringInner {
|
||||
}
|
||||
|
||||
fn has_private_peer_addr(&self) -> bool {
|
||||
match self.conn_info.addr.ip() {
|
||||
match self.peer_addr {
|
||||
IpAddr::V4(ip) => ip.is_private(),
|
||||
IpAddr::V6(_) => false,
|
||||
}
|
||||
|
||||
@@ -121,7 +121,7 @@ impl From<&RequestMonitoringInner> for RequestData {
|
||||
fn from(value: &RequestMonitoringInner) -> Self {
|
||||
Self {
|
||||
session_id: value.session_id,
|
||||
peer_addr: value.conn_info.addr.ip().to_string(),
|
||||
peer_addr: value.peer_addr.to_string(),
|
||||
timestamp: value.first_packet.naive_utc(),
|
||||
username: value.user.as_deref().map(String::from),
|
||||
application_name: value.application.as_deref().map(String::from),
|
||||
|
||||
@@ -1,15 +1,12 @@
|
||||
//! Proxy Protocol V2 implementation
|
||||
//! Compatible with <https://www.haproxy.org/download/3.1/doc/proxy-protocol.txt>
|
||||
|
||||
use core::fmt;
|
||||
use std::io;
|
||||
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use bytes::BytesMut;
|
||||
use pin_project_lite::pin_project;
|
||||
use strum_macros::FromRepr;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf};
|
||||
|
||||
pin_project! {
|
||||
@@ -61,35 +58,9 @@ const HEADER: [u8; 12] = [
|
||||
0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A,
|
||||
];
|
||||
|
||||
#[derive(PartialEq, Eq, Clone, Debug)]
|
||||
pub struct ConnectionInfo {
|
||||
pub addr: SocketAddr,
|
||||
pub extra: Option<ConnectionInfoExtra>,
|
||||
}
|
||||
|
||||
impl fmt::Display for ConnectionInfo {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match &self.extra {
|
||||
None => self.addr.ip().fmt(f),
|
||||
Some(ConnectionInfoExtra::Aws { vpce_id }) => {
|
||||
write!(f, "vpce_id[{vpce_id:?}]:addr[{}]", self.addr.ip())
|
||||
}
|
||||
Some(ConnectionInfoExtra::Azure { link_id }) => {
|
||||
write!(f, "link_id[{link_id}]:addr[{}]", self.addr.ip())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Clone, Debug)]
|
||||
pub enum ConnectionInfoExtra {
|
||||
Aws { vpce_id: Bytes },
|
||||
Azure { link_id: u32 },
|
||||
}
|
||||
|
||||
pub(crate) async fn read_proxy_protocol<T: AsyncRead + Unpin>(
|
||||
mut read: T,
|
||||
) -> std::io::Result<(ChainRW<T>, Option<ConnectionInfo>)> {
|
||||
) -> std::io::Result<(ChainRW<T>, Option<SocketAddr>)> {
|
||||
let mut buf = BytesMut::with_capacity(128);
|
||||
while buf.len() < 16 {
|
||||
let bytes_read = read.read_buf(&mut buf).await?;
|
||||
@@ -193,107 +164,22 @@ pub(crate) async fn read_proxy_protocol<T: AsyncRead + Unpin>(
|
||||
// - destination layer 3 address in network byte order
|
||||
// - source layer 4 address if any, in network byte order (port)
|
||||
// - destination layer 4 address if any, in network byte order (port)
|
||||
let mut header = buf.split_to(usize::from(remaining_length));
|
||||
let mut addr = header.split_to(usize::from(address_length));
|
||||
let socket = match addr.len() {
|
||||
let addresses = buf.split_to(remaining_length as usize);
|
||||
let socket = match address_length {
|
||||
12 => {
|
||||
let src_addr = Ipv4Addr::from_bits(addr.get_u32());
|
||||
let _dst_addr = Ipv4Addr::from_bits(addr.get_u32());
|
||||
let src_port = addr.get_u16();
|
||||
let _dst_port = addr.get_u16();
|
||||
let src_addr: [u8; 4] = addresses[0..4].try_into().unwrap();
|
||||
let src_port = u16::from_be_bytes(addresses[8..10].try_into().unwrap());
|
||||
Some(SocketAddr::from((src_addr, src_port)))
|
||||
}
|
||||
36 => {
|
||||
let src_addr = Ipv6Addr::from_bits(addr.get_u128());
|
||||
let _dst_addr = Ipv6Addr::from_bits(addr.get_u128());
|
||||
let src_port = addr.get_u16();
|
||||
let _dst_port = addr.get_u16();
|
||||
let src_addr: [u8; 16] = addresses[0..16].try_into().unwrap();
|
||||
let src_port = u16::from_be_bytes(addresses[32..34].try_into().unwrap());
|
||||
Some(SocketAddr::from((src_addr, src_port)))
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let mut extra = None;
|
||||
|
||||
while let Some(mut tlv) = read_tlv(&mut header) {
|
||||
match Pp2Kind::from_repr(tlv.kind) {
|
||||
Some(Pp2Kind::Aws) => {
|
||||
if tlv.value.is_empty() {
|
||||
tracing::warn!("invalid aws tlv: no subtype");
|
||||
}
|
||||
let subtype = tlv.value.get_u8();
|
||||
match Pp2AwsType::from_repr(subtype) {
|
||||
Some(Pp2AwsType::VpceId) => {
|
||||
extra = Some(ConnectionInfoExtra::Aws { vpce_id: tlv.value });
|
||||
}
|
||||
None => {
|
||||
tracing::warn!("unknown aws tlv: subtype={subtype}");
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Pp2Kind::Azure) => {
|
||||
if tlv.value.is_empty() {
|
||||
tracing::warn!("invalid azure tlv: no subtype");
|
||||
}
|
||||
let subtype = tlv.value.get_u8();
|
||||
match Pp2AzureType::from_repr(subtype) {
|
||||
Some(Pp2AzureType::PrivateEndpointLinkId) => {
|
||||
if tlv.value.len() != 4 {
|
||||
tracing::warn!("invalid azure link_id: {:?}", tlv.value);
|
||||
}
|
||||
extra = Some(ConnectionInfoExtra::Azure {
|
||||
link_id: tlv.value.get_u32_le(),
|
||||
});
|
||||
}
|
||||
None => {
|
||||
tracing::warn!("unknown azure tlv: subtype={subtype}");
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(kind) => {
|
||||
tracing::debug!("unused tlv[{kind:?}]: {:?}", tlv.value);
|
||||
}
|
||||
None => {
|
||||
tracing::debug!("unknown tlv: {tlv:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let conn_info = socket.map(|addr| ConnectionInfo { addr, extra });
|
||||
|
||||
Ok((ChainRW { inner: read, buf }, conn_info))
|
||||
}
|
||||
|
||||
#[derive(FromRepr, Debug, Copy, Clone)]
|
||||
#[repr(u8)]
|
||||
enum Pp2Kind {
|
||||
// The following are defined by https://www.haproxy.org/download/3.1/doc/proxy-protocol.txt
|
||||
// we don't use these but it would be interesting to know what's available
|
||||
Alpn = 0x01,
|
||||
Authority = 0x02,
|
||||
Crc32C = 0x03,
|
||||
Noop = 0x04,
|
||||
UniqueId = 0x05,
|
||||
Ssl = 0x20,
|
||||
NetNs = 0x30,
|
||||
|
||||
/// <https://docs.aws.amazon.com/elasticloadbalancing/latest/network/edit-target-group-attributes.html#proxy-protocol>
|
||||
Aws = 0xEA,
|
||||
|
||||
/// <https://learn.microsoft.com/en-us/azure/private-link/private-link-service-overview#getting-connection-information-using-tcp-proxy-v2>
|
||||
Azure = 0xEE,
|
||||
}
|
||||
|
||||
#[derive(FromRepr, Debug, Copy, Clone)]
|
||||
#[repr(u8)]
|
||||
enum Pp2AwsType {
|
||||
VpceId = 0x01,
|
||||
}
|
||||
|
||||
#[derive(FromRepr, Debug, Copy, Clone)]
|
||||
#[repr(u8)]
|
||||
enum Pp2AzureType {
|
||||
PrivateEndpointLinkId = 0x01,
|
||||
Ok((ChainRW { inner: read, buf }, socket))
|
||||
}
|
||||
|
||||
impl<T: AsyncRead> AsyncRead for ChainRW<T> {
|
||||
@@ -330,25 +216,6 @@ impl<T: AsyncRead> ChainRW<T> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Tlv {
|
||||
kind: u8,
|
||||
value: Bytes,
|
||||
}
|
||||
|
||||
fn read_tlv(b: &mut BytesMut) -> Option<Tlv> {
|
||||
if b.len() < 3 {
|
||||
return None;
|
||||
}
|
||||
let kind = b.get_u8();
|
||||
let len = usize::from(b.get_u16());
|
||||
if b.len() < len {
|
||||
return None;
|
||||
}
|
||||
let value = b.split_to(len).freeze();
|
||||
Some(Tlv { kind, value })
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use tokio::io::AsyncReadExt;
|
||||
@@ -375,7 +242,7 @@ mod tests {
|
||||
|
||||
let extra_data = [0x55; 256];
|
||||
|
||||
let (mut read, info) = read_proxy_protocol(header.chain(extra_data.as_slice()))
|
||||
let (mut read, addr) = read_proxy_protocol(header.chain(extra_data.as_slice()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -383,9 +250,7 @@ mod tests {
|
||||
read.read_to_end(&mut bytes).await.unwrap();
|
||||
|
||||
assert_eq!(bytes, extra_data);
|
||||
|
||||
let info = info.unwrap();
|
||||
assert_eq!(info.addr, ([127, 0, 0, 1], 65535).into());
|
||||
assert_eq!(addr, Some(([127, 0, 0, 1], 65535).into()));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -408,7 +273,7 @@ mod tests {
|
||||
|
||||
let extra_data = [0x55; 256];
|
||||
|
||||
let (mut read, info) = read_proxy_protocol(header.chain(extra_data.as_slice()))
|
||||
let (mut read, addr) = read_proxy_protocol(header.chain(extra_data.as_slice()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -416,11 +281,9 @@ mod tests {
|
||||
read.read_to_end(&mut bytes).await.unwrap();
|
||||
|
||||
assert_eq!(bytes, extra_data);
|
||||
|
||||
let info = info.unwrap();
|
||||
assert_eq!(
|
||||
info.addr,
|
||||
([15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0], 257).into()
|
||||
addr,
|
||||
Some(([15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0], 257).into())
|
||||
);
|
||||
}
|
||||
|
||||
@@ -428,31 +291,30 @@ mod tests {
|
||||
async fn test_invalid() {
|
||||
let data = [0x55; 256];
|
||||
|
||||
let (mut read, info) = read_proxy_protocol(data.as_slice()).await.unwrap();
|
||||
let (mut read, addr) = read_proxy_protocol(data.as_slice()).await.unwrap();
|
||||
|
||||
let mut bytes = vec![];
|
||||
read.read_to_end(&mut bytes).await.unwrap();
|
||||
assert_eq!(bytes, data);
|
||||
assert_eq!(info, None);
|
||||
assert_eq!(addr, None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_short() {
|
||||
let data = [0x55; 10];
|
||||
|
||||
let (mut read, info) = read_proxy_protocol(data.as_slice()).await.unwrap();
|
||||
let (mut read, addr) = read_proxy_protocol(data.as_slice()).await.unwrap();
|
||||
|
||||
let mut bytes = vec![];
|
||||
read.read_to_end(&mut bytes).await.unwrap();
|
||||
assert_eq!(bytes, data);
|
||||
assert_eq!(info, None);
|
||||
assert_eq!(addr, None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_large_tlv() {
|
||||
let tlv = vec![0x55; 32768];
|
||||
let tlv_len = (tlv.len() as u16).to_be_bytes();
|
||||
let len = (12 + 3 + tlv.len() as u16).to_be_bytes();
|
||||
let len = (12 + tlv.len() as u16).to_be_bytes();
|
||||
|
||||
let header = super::HEADER
|
||||
// Proxy command, Inet << 4 | Stream
|
||||
@@ -468,13 +330,11 @@ mod tests {
|
||||
// dst port
|
||||
.chain([1, 1].as_slice())
|
||||
// TLV
|
||||
.chain([255].as_slice())
|
||||
.chain(tlv_len.as_slice())
|
||||
.chain(tlv.as_slice());
|
||||
|
||||
let extra_data = [0xaa; 256];
|
||||
|
||||
let (mut read, info) = read_proxy_protocol(header.chain(extra_data.as_slice()))
|
||||
let (mut read, addr) = read_proxy_protocol(header.chain(extra_data.as_slice()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -482,8 +342,6 @@ mod tests {
|
||||
read.read_to_end(&mut bytes).await.unwrap();
|
||||
|
||||
assert_eq!(bytes, extra_data);
|
||||
|
||||
let info = info.unwrap();
|
||||
assert_eq!(info.addr, ([55, 56, 57, 58], 65535).into());
|
||||
assert_eq!(addr, Some(([55, 56, 57, 58], 65535).into()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig};
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::error::ReportableError;
|
||||
use crate::metrics::{Metrics, NumClientConnectionsGuard};
|
||||
use crate::protocol2::{read_proxy_protocol, ConnectionInfo};
|
||||
use crate::protocol2::read_proxy_protocol;
|
||||
use crate::proxy::handshake::{handshake, HandshakeData};
|
||||
use crate::rate_limiter::EndpointRateLimiter;
|
||||
use crate::stream::{PqStream, Stream};
|
||||
@@ -87,7 +87,7 @@ pub async fn task_main(
|
||||
let endpoint_rate_limiter2 = endpoint_rate_limiter.clone();
|
||||
|
||||
connections.spawn(async move {
|
||||
let (socket, conn_info) = match read_proxy_protocol(socket).await {
|
||||
let (socket, peer_addr) = match read_proxy_protocol(socket).await {
|
||||
Err(e) => {
|
||||
warn!("per-client task finished with an error: {e:#}");
|
||||
return;
|
||||
@@ -100,8 +100,8 @@ pub async fn task_main(
|
||||
warn!("proxy protocol header not supported");
|
||||
return;
|
||||
}
|
||||
Ok((socket, Some(info))) => (socket, info),
|
||||
Ok((socket, None)) => (socket, ConnectionInfo { addr: peer_addr, extra: None }),
|
||||
Ok((socket, Some(addr))) => (socket, addr.ip()),
|
||||
Ok((socket, None)) => (socket, peer_addr.ip()),
|
||||
};
|
||||
|
||||
match socket.inner.set_nodelay(true) {
|
||||
@@ -114,7 +114,7 @@ pub async fn task_main(
|
||||
|
||||
let ctx = RequestMonitoring::new(
|
||||
session_id,
|
||||
conn_info,
|
||||
peer_addr,
|
||||
crate::metrics::Protocol::Tcp,
|
||||
&config.region,
|
||||
);
|
||||
|
||||
@@ -9,12 +9,11 @@ use async_trait::async_trait;
|
||||
use http::StatusCode;
|
||||
use retry::{retry_after, ShouldRetryWakeCompute};
|
||||
use rstest::rstest;
|
||||
use rustls::crypto::ring;
|
||||
use rustls::crypto::aws_lc_rs;
|
||||
use rustls::pki_types;
|
||||
use tokio::io::DuplexStream;
|
||||
use tokio_postgres::config::SslMode;
|
||||
use tokio_postgres::tls::{MakeTlsConnect, NoTls};
|
||||
use tokio_postgres_rustls::MakeRustlsConnect;
|
||||
use tokio_postgres_rustls::{MakeRustlsConnect, RustlsStream};
|
||||
|
||||
use super::connect_compute::ConnectMechanism;
|
||||
use super::retry::CouldRetry;
|
||||
@@ -70,12 +69,19 @@ struct ClientConfig<'a> {
|
||||
hostname: &'a str,
|
||||
}
|
||||
|
||||
type TlsConnect<S> = <MakeRustlsConnect as MakeTlsConnect<S>>::TlsConnect;
|
||||
|
||||
impl ClientConfig<'_> {
|
||||
fn make_tls_connect(self) -> anyhow::Result<TlsConnect<DuplexStream>> {
|
||||
fn make_tls_connect<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
|
||||
self,
|
||||
) -> anyhow::Result<
|
||||
impl tokio_postgres::tls::TlsConnect<
|
||||
S,
|
||||
Error = impl std::fmt::Debug + use<S>,
|
||||
Future = impl Send + use<S>,
|
||||
Stream = RustlsStream<S>,
|
||||
> + use<S>,
|
||||
> {
|
||||
let mut mk = MakeRustlsConnect::new(self.config);
|
||||
let tls = MakeTlsConnect::<DuplexStream>::make_tls_connect(&mut mk, self.hostname)?;
|
||||
let tls = MakeTlsConnect::<S>::make_tls_connect(&mut mk, self.hostname)?;
|
||||
Ok(tls)
|
||||
}
|
||||
}
|
||||
@@ -89,9 +95,9 @@ fn generate_tls_config<'a>(
|
||||
|
||||
let tls_config = {
|
||||
let config =
|
||||
rustls::ServerConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
rustls::ServerConfig::builder_with_provider(Arc::new(aws_lc_rs::default_provider()))
|
||||
.with_safe_default_protocol_versions()
|
||||
.context("ring should support the default protocol versions")?
|
||||
.context("aws_lc_rs should support the default protocol versions")?
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(vec![cert.clone()], key.clone_key())?
|
||||
.into();
|
||||
@@ -110,9 +116,9 @@ fn generate_tls_config<'a>(
|
||||
|
||||
let client_config = {
|
||||
let config =
|
||||
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
rustls::ClientConfig::builder_with_provider(Arc::new(aws_lc_rs::default_provider()))
|
||||
.with_safe_default_protocol_versions()
|
||||
.context("ring should support the default protocol versions")?
|
||||
.context("aws_lc_rs should support the default protocol versions")?
|
||||
.with_root_certificates({
|
||||
let mut store = rustls::RootCertStore::empty();
|
||||
store.add(ca)?;
|
||||
|
||||
@@ -294,11 +294,6 @@ pub(crate) fn poll_http2_client(
|
||||
conn_id,
|
||||
aux: aux.clone(),
|
||||
});
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.http_pool_opened_connections
|
||||
.get_metric()
|
||||
.inc();
|
||||
|
||||
Arc::downgrade(&pool)
|
||||
}
|
||||
@@ -311,7 +306,7 @@ pub(crate) fn poll_http2_client(
|
||||
let res = connection.await;
|
||||
match res {
|
||||
Ok(()) => info!("connection closed"),
|
||||
Err(e) => error!(%session_id, "connection error: {e:?}"),
|
||||
Err(e) => error!(%session_id, "connection error: {}", e),
|
||||
}
|
||||
|
||||
// remove from connection pool
|
||||
|
||||
@@ -44,10 +44,10 @@ use tracing::{info, warn, Instrument};
|
||||
use utils::http::error::ApiError;
|
||||
|
||||
use crate::cancellation::CancellationHandlerMain;
|
||||
use crate::config::{ProxyConfig, ProxyProtocolV2};
|
||||
use crate::config::ProxyConfig;
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::protocol2::{read_proxy_protocol, ChainRW, ConnectionInfo};
|
||||
use crate::protocol2::{read_proxy_protocol, ChainRW};
|
||||
use crate::proxy::run_until_cancelled;
|
||||
use crate::rate_limiter::EndpointRateLimiter;
|
||||
use crate::serverless::backend::PoolingBackend;
|
||||
@@ -180,7 +180,7 @@ pub async fn task_main(
|
||||
peer_addr,
|
||||
))
|
||||
.await;
|
||||
let Some((conn, conn_info)) = startup_result else {
|
||||
let Some((conn, peer_addr)) = startup_result else {
|
||||
return;
|
||||
};
|
||||
|
||||
@@ -192,7 +192,7 @@ pub async fn task_main(
|
||||
endpoint_rate_limiter,
|
||||
conn_token,
|
||||
conn,
|
||||
conn_info,
|
||||
peer_addr,
|
||||
session_id,
|
||||
))
|
||||
.await;
|
||||
@@ -240,7 +240,7 @@ async fn connection_startup(
|
||||
session_id: uuid::Uuid,
|
||||
conn: TcpStream,
|
||||
peer_addr: SocketAddr,
|
||||
) -> Option<(AsyncRW, ConnectionInfo)> {
|
||||
) -> Option<(AsyncRW, IpAddr)> {
|
||||
// handle PROXY protocol
|
||||
let (conn, peer) = match read_proxy_protocol(conn).await {
|
||||
Ok(c) => c,
|
||||
@@ -250,32 +250,17 @@ async fn connection_startup(
|
||||
}
|
||||
};
|
||||
|
||||
let conn_info = match peer {
|
||||
None if config.proxy_protocol_v2 == ProxyProtocolV2::Required => {
|
||||
tracing::warn!("missing required proxy protocol header");
|
||||
return None;
|
||||
}
|
||||
Some(_) if config.proxy_protocol_v2 == ProxyProtocolV2::Rejected => {
|
||||
tracing::warn!("proxy protocol header not supported");
|
||||
return None;
|
||||
}
|
||||
Some(info) => info,
|
||||
None => ConnectionInfo {
|
||||
addr: peer_addr,
|
||||
extra: None,
|
||||
},
|
||||
};
|
||||
|
||||
let has_private_peer_addr = match conn_info.addr.ip() {
|
||||
let peer_addr = peer.unwrap_or(peer_addr).ip();
|
||||
let has_private_peer_addr = match peer_addr {
|
||||
IpAddr::V4(ip) => ip.is_private(),
|
||||
IpAddr::V6(_) => false,
|
||||
};
|
||||
info!(?session_id, %conn_info, "accepted new TCP connection");
|
||||
info!(?session_id, %peer_addr, "accepted new TCP connection");
|
||||
|
||||
// try upgrade to TLS, but with a timeout.
|
||||
let conn = match timeout(config.handshake_timeout, tls_acceptor.accept(conn)).await {
|
||||
Ok(Ok(conn)) => {
|
||||
info!(?session_id, %conn_info, "accepted new TLS connection");
|
||||
info!(?session_id, %peer_addr, "accepted new TLS connection");
|
||||
conn
|
||||
}
|
||||
// The handshake failed
|
||||
@@ -283,7 +268,7 @@ async fn connection_startup(
|
||||
if !has_private_peer_addr {
|
||||
Metrics::get().proxy.tls_handshake_failures.inc();
|
||||
}
|
||||
warn!(?session_id, %conn_info, "failed to accept TLS connection: {e:?}");
|
||||
warn!(?session_id, %peer_addr, "failed to accept TLS connection: {e:?}");
|
||||
return None;
|
||||
}
|
||||
// The handshake timed out
|
||||
@@ -291,12 +276,12 @@ async fn connection_startup(
|
||||
if !has_private_peer_addr {
|
||||
Metrics::get().proxy.tls_handshake_failures.inc();
|
||||
}
|
||||
warn!(?session_id, %conn_info, "failed to accept TLS connection: {e:?}");
|
||||
warn!(?session_id, %peer_addr, "failed to accept TLS connection: {e:?}");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
Some((conn, conn_info))
|
||||
Some((conn, peer_addr))
|
||||
}
|
||||
|
||||
/// Handles HTTP connection
|
||||
@@ -312,7 +297,7 @@ async fn connection_handler(
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
cancellation_token: CancellationToken,
|
||||
conn: AsyncRW,
|
||||
conn_info: ConnectionInfo,
|
||||
peer_addr: IpAddr,
|
||||
session_id: uuid::Uuid,
|
||||
) {
|
||||
let session_id = AtomicTake::new(session_id);
|
||||
@@ -321,7 +306,6 @@ async fn connection_handler(
|
||||
let http_cancellation_token = CancellationToken::new();
|
||||
let _cancel_connection = http_cancellation_token.clone().drop_guard();
|
||||
|
||||
let conn_info2 = conn_info.clone();
|
||||
let server = Builder::new(TokioExecutor::new());
|
||||
let conn = server.serve_connection_with_upgrades(
|
||||
hyper_util::rt::TokioIo::new(conn),
|
||||
@@ -356,7 +340,7 @@ async fn connection_handler(
|
||||
connections.clone(),
|
||||
cancellation_handler.clone(),
|
||||
session_id,
|
||||
conn_info2.clone(),
|
||||
peer_addr,
|
||||
http_request_token,
|
||||
endpoint_rate_limiter.clone(),
|
||||
)
|
||||
@@ -381,7 +365,7 @@ async fn connection_handler(
|
||||
// On cancellation, trigger the HTTP connection handler to shut down.
|
||||
let res = match select(pin!(cancellation_token.cancelled()), pin!(conn)).await {
|
||||
Either::Left((_cancelled, mut conn)) => {
|
||||
tracing::debug!(%conn_info, "cancelling connection");
|
||||
tracing::debug!(%peer_addr, "cancelling connection");
|
||||
conn.as_mut().graceful_shutdown();
|
||||
conn.await
|
||||
}
|
||||
@@ -389,8 +373,8 @@ async fn connection_handler(
|
||||
};
|
||||
|
||||
match res {
|
||||
Ok(()) => tracing::info!(%conn_info, "HTTP connection closed"),
|
||||
Err(e) => tracing::warn!(%conn_info, "HTTP connection error {e}"),
|
||||
Ok(()) => tracing::info!(%peer_addr, "HTTP connection closed"),
|
||||
Err(e) => tracing::warn!(%peer_addr, "HTTP connection error {e}"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -402,7 +386,7 @@ async fn request_handler(
|
||||
ws_connections: TaskTracker,
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
session_id: uuid::Uuid,
|
||||
conn_info: ConnectionInfo,
|
||||
peer_addr: IpAddr,
|
||||
// used to cancel in-flight HTTP requests. not used to cancel websockets
|
||||
http_cancellation_token: CancellationToken,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
@@ -420,7 +404,7 @@ async fn request_handler(
|
||||
{
|
||||
let ctx = RequestMonitoring::new(
|
||||
session_id,
|
||||
conn_info,
|
||||
peer_addr,
|
||||
crate::metrics::Protocol::Ws,
|
||||
&config.region,
|
||||
);
|
||||
@@ -455,7 +439,7 @@ async fn request_handler(
|
||||
} else if request.uri().path() == "/sql" && *request.method() == Method::POST {
|
||||
let ctx = RequestMonitoring::new(
|
||||
session_id,
|
||||
conn_info,
|
||||
peer_addr,
|
||||
crate::metrics::Protocol::Http,
|
||||
&config.region,
|
||||
);
|
||||
|
||||
@@ -67,7 +67,7 @@ exclude = [
|
||||
check_untyped_defs = true
|
||||
# Help mypy find imports when running against list of individual files.
|
||||
# Without this line it would behave differently when executed on the entire project.
|
||||
mypy_path = "$MYPY_CONFIG_FILE_DIR:$MYPY_CONFIG_FILE_DIR/test_runner:$MYPY_CONFIG_FILE_DIR/test_runner/stubs"
|
||||
mypy_path = "$MYPY_CONFIG_FILE_DIR:$MYPY_CONFIG_FILE_DIR/test_runner"
|
||||
|
||||
disallow_incomplete_defs = false
|
||||
disallow_untyped_calls = false
|
||||
|
||||
@@ -5,23 +5,23 @@ use std::{
|
||||
time::{Instant, SystemTime},
|
||||
};
|
||||
|
||||
use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_FSYNC_SECONDS_BUCKETS};
|
||||
use anyhow::Result;
|
||||
use futures::Future;
|
||||
use metrics::{
|
||||
core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
|
||||
pow2_buckets,
|
||||
proto::MetricFamily,
|
||||
register_histogram, register_histogram_vec, register_int_counter, register_int_counter_pair,
|
||||
register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge, Gauge, GaugeVec,
|
||||
Histogram, HistogramVec, IntCounter, IntCounterPair, IntCounterPairVec, IntCounterVec,
|
||||
IntGauge, IntGaugeVec, DISK_FSYNC_SECONDS_BUCKETS,
|
||||
register_histogram_vec, register_int_counter, register_int_counter_pair,
|
||||
register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge, Gauge,
|
||||
HistogramVec, IntCounter, IntCounterPair, IntCounterPairVec, IntCounterVec, IntGaugeVec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use postgres_ffi::XLogSegNo;
|
||||
use utils::{id::TenantTimelineId, lsn::Lsn, pageserver_feedback::PageserverFeedback};
|
||||
use utils::pageserver_feedback::PageserverFeedback;
|
||||
use utils::{id::TenantTimelineId, lsn::Lsn};
|
||||
|
||||
use crate::{
|
||||
receive_wal::MSG_QUEUE_SIZE,
|
||||
state::{TimelineMemState, TimelinePersistentState},
|
||||
GlobalTimelines,
|
||||
};
|
||||
@@ -204,44 +204,6 @@ pub static WAL_BACKUP_TASKS: Lazy<IntCounterPair> = Lazy::new(|| {
|
||||
)
|
||||
.expect("Failed to register safekeeper_wal_backup_tasks_finished_total counter")
|
||||
});
|
||||
pub static WAL_RECEIVERS: Lazy<IntGauge> = Lazy::new(|| {
|
||||
register_int_gauge!(
|
||||
"safekeeper_wal_receivers",
|
||||
"Number of currently connected WAL receivers (i.e. connected computes)"
|
||||
)
|
||||
.expect("Failed to register safekeeper_wal_receivers")
|
||||
});
|
||||
pub static WAL_RECEIVER_QUEUE_DEPTH: Lazy<Histogram> = Lazy::new(|| {
|
||||
// Use powers of two buckets, but add a bucket at 0 and the max queue size to track empty and
|
||||
// full queues respectively.
|
||||
let mut buckets = pow2_buckets(1, MSG_QUEUE_SIZE);
|
||||
buckets.insert(0, 0.0);
|
||||
buckets.insert(buckets.len() - 1, (MSG_QUEUE_SIZE - 1) as f64);
|
||||
assert!(buckets.len() <= 12, "too many histogram buckets");
|
||||
|
||||
register_histogram!(
|
||||
"safekeeper_wal_receiver_queue_depth",
|
||||
"Number of queued messages per WAL receiver (sampled every 5 seconds)",
|
||||
buckets
|
||||
)
|
||||
.expect("Failed to register safekeeper_wal_receiver_queue_depth histogram")
|
||||
});
|
||||
pub static WAL_RECEIVER_QUEUE_DEPTH_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
|
||||
register_int_gauge!(
|
||||
"safekeeper_wal_receiver_queue_depth_total",
|
||||
"Total number of queued messages across all WAL receivers",
|
||||
)
|
||||
.expect("Failed to register safekeeper_wal_receiver_queue_depth_total gauge")
|
||||
});
|
||||
// TODO: consider adding a per-receiver queue_size histogram. This will require wrapping the Tokio
|
||||
// MPSC channel to update counters on send, receive, and drop, while forwarding all other methods.
|
||||
pub static WAL_RECEIVER_QUEUE_SIZE_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
|
||||
register_int_gauge!(
|
||||
"safekeeper_wal_receiver_queue_size_total",
|
||||
"Total memory byte size of queued messages across all WAL receivers",
|
||||
)
|
||||
.expect("Failed to register safekeeper_wal_receiver_queue_size_total gauge")
|
||||
});
|
||||
|
||||
// Metrics collected on operations on the storage repository.
|
||||
#[derive(strum_macros::EnumString, strum_macros::Display, strum_macros::IntoStaticStr)]
|
||||
|
||||
@@ -3,10 +3,6 @@
|
||||
//! sends replies back.
|
||||
|
||||
use crate::handler::SafekeeperPostgresHandler;
|
||||
use crate::metrics::{
|
||||
WAL_RECEIVERS, WAL_RECEIVER_QUEUE_DEPTH, WAL_RECEIVER_QUEUE_DEPTH_TOTAL,
|
||||
WAL_RECEIVER_QUEUE_SIZE_TOTAL,
|
||||
};
|
||||
use crate::safekeeper::AcceptorProposerMessage;
|
||||
use crate::safekeeper::ProposerAcceptorMessage;
|
||||
use crate::safekeeper::ServerInfo;
|
||||
@@ -30,11 +26,10 @@ use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::sync::mpsc::error::SendTimeoutError;
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
use tokio::task;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::{Duration, Instant, MissedTickBehavior};
|
||||
use tokio::time::{Duration, MissedTickBehavior};
|
||||
use tracing::*;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -90,7 +85,6 @@ impl WalReceivers {
|
||||
};
|
||||
|
||||
self.update_num(&shared);
|
||||
WAL_RECEIVERS.inc();
|
||||
|
||||
WalReceiverGuard {
|
||||
id: pos,
|
||||
@@ -149,7 +143,6 @@ impl WalReceivers {
|
||||
let mut shared = self.mutex.lock();
|
||||
shared.slots[id] = None;
|
||||
self.update_num(&shared);
|
||||
WAL_RECEIVERS.dec();
|
||||
}
|
||||
|
||||
/// Broadcast pageserver feedback to connected walproposers.
|
||||
@@ -391,36 +384,10 @@ async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
msg_tx: Sender<ProposerAcceptorMessage>,
|
||||
mut next_msg: ProposerAcceptorMessage,
|
||||
) -> Result<(), CopyStreamHandlerEnd> {
|
||||
/// Threshold for logging slow WalAcceptor sends.
|
||||
const SLOW_THRESHOLD: Duration = Duration::from_secs(5);
|
||||
|
||||
loop {
|
||||
let started = Instant::now();
|
||||
let size = next_msg.size();
|
||||
match msg_tx.send_timeout(next_msg, SLOW_THRESHOLD).await {
|
||||
Ok(()) => {}
|
||||
// Slow send, log a message and keep trying. Log context has timeline ID.
|
||||
Err(SendTimeoutError::Timeout(next_msg)) => {
|
||||
warn!(
|
||||
"slow WalAcceptor send blocked for {:.3}s",
|
||||
Instant::now().duration_since(started).as_secs_f64()
|
||||
);
|
||||
if msg_tx.send(next_msg).await.is_err() {
|
||||
return Ok(()); // WalAcceptor terminated
|
||||
}
|
||||
warn!(
|
||||
"slow WalAcceptor send completed after {:.3}s",
|
||||
Instant::now().duration_since(started).as_secs_f64()
|
||||
)
|
||||
}
|
||||
// WalAcceptor terminated.
|
||||
Err(SendTimeoutError::Closed(_)) => return Ok(()),
|
||||
if msg_tx.send(next_msg).await.is_err() {
|
||||
return Ok(()); // chan closed, WalAcceptor terminated
|
||||
}
|
||||
|
||||
// Update metrics. Will be decremented in WalAcceptor.
|
||||
WAL_RECEIVER_QUEUE_DEPTH_TOTAL.inc();
|
||||
WAL_RECEIVER_QUEUE_SIZE_TOTAL.add(size as i64);
|
||||
|
||||
next_msg = read_message(pgb_reader).await?;
|
||||
}
|
||||
}
|
||||
@@ -478,12 +445,6 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
/// walproposer, even when it's writing a steady stream of messages.
|
||||
const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
|
||||
|
||||
/// The metrics computation interval.
|
||||
///
|
||||
/// The Prometheus poll interval is 60 seconds at the time of writing. We sample the queue depth
|
||||
/// every 5 seconds, for 12 samples per poll. This will give a count of up to 12x active timelines.
|
||||
const METRICS_INTERVAL: Duration = Duration::from_secs(5);
|
||||
|
||||
/// Encapsulates a task which takes messages from msg_rx, processes and pushes
|
||||
/// replies to reply_tx.
|
||||
///
|
||||
@@ -530,15 +491,12 @@ impl WalAcceptor {
|
||||
async fn run(&mut self) -> anyhow::Result<()> {
|
||||
let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
|
||||
|
||||
// Periodically flush the WAL and compute metrics.
|
||||
// Periodically flush the WAL.
|
||||
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
|
||||
flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
||||
flush_ticker.tick().await; // skip the initial, immediate tick
|
||||
|
||||
let mut metrics_ticker = tokio::time::interval(METRICS_INTERVAL);
|
||||
metrics_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
|
||||
// Tracks whether we have unflushed appends.
|
||||
// Tracks unflushed appends.
|
||||
let mut dirty = false;
|
||||
|
||||
loop {
|
||||
@@ -550,10 +508,6 @@ impl WalAcceptor {
|
||||
break;
|
||||
};
|
||||
|
||||
// Update gauge metrics.
|
||||
WAL_RECEIVER_QUEUE_DEPTH_TOTAL.dec();
|
||||
WAL_RECEIVER_QUEUE_SIZE_TOTAL.sub(msg.size() as i64);
|
||||
|
||||
// Update walreceiver state in shmem for reporting.
|
||||
if let ProposerAcceptorMessage::Elected(_) = &msg {
|
||||
walreceiver_guard.get().status = WalReceiverStatus::Streaming;
|
||||
@@ -590,12 +544,6 @@ impl WalAcceptor {
|
||||
.process_msg(&ProposerAcceptorMessage::FlushWAL)
|
||||
.await?
|
||||
}
|
||||
|
||||
// Update histogram metrics periodically.
|
||||
_ = metrics_ticker.tick() => {
|
||||
WAL_RECEIVER_QUEUE_DEPTH.observe(self.msg_rx.len() as f64);
|
||||
None // no reply
|
||||
}
|
||||
};
|
||||
|
||||
// Send reply, if any.
|
||||
@@ -616,14 +564,3 @@ impl WalAcceptor {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// On drop, drain msg_rx and update metrics to avoid leaks.
|
||||
impl Drop for WalAcceptor {
|
||||
fn drop(&mut self) {
|
||||
self.msg_rx.close(); // prevent further sends
|
||||
while let Ok(msg) = self.msg_rx.try_recv() {
|
||||
WAL_RECEIVER_QUEUE_DEPTH_TOTAL.dec();
|
||||
WAL_RECEIVER_QUEUE_SIZE_TOTAL.sub(msg.size() as i64);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -422,70 +422,6 @@ impl ProposerAcceptorMessage {
|
||||
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
|
||||
}
|
||||
}
|
||||
|
||||
/// The memory size of the message, including byte slices.
|
||||
pub fn size(&self) -> usize {
|
||||
const BASE_SIZE: usize = std::mem::size_of::<ProposerAcceptorMessage>();
|
||||
|
||||
// For most types, the size is just the base enum size including the nested structs. Some
|
||||
// types also contain byte slices; add them.
|
||||
//
|
||||
// We explicitly list all fields, to draw attention here when new fields are added.
|
||||
let mut size = BASE_SIZE;
|
||||
size += match self {
|
||||
Self::Greeting(ProposerGreeting {
|
||||
protocol_version: _,
|
||||
pg_version: _,
|
||||
proposer_id: _,
|
||||
system_id: _,
|
||||
timeline_id: _,
|
||||
tenant_id: _,
|
||||
tli: _,
|
||||
wal_seg_size: _,
|
||||
}) => 0,
|
||||
|
||||
Self::VoteRequest(VoteRequest { term: _ }) => 0,
|
||||
|
||||
Self::Elected(ProposerElected {
|
||||
term: _,
|
||||
start_streaming_at: _,
|
||||
term_history: _,
|
||||
timeline_start_lsn: _,
|
||||
}) => 0,
|
||||
|
||||
Self::AppendRequest(AppendRequest {
|
||||
h:
|
||||
AppendRequestHeader {
|
||||
term: _,
|
||||
term_start_lsn: _,
|
||||
begin_lsn: _,
|
||||
end_lsn: _,
|
||||
commit_lsn: _,
|
||||
truncate_lsn: _,
|
||||
proposer_uuid: _,
|
||||
},
|
||||
wal_data,
|
||||
}) => wal_data.len(),
|
||||
|
||||
Self::NoFlushAppendRequest(AppendRequest {
|
||||
h:
|
||||
AppendRequestHeader {
|
||||
term: _,
|
||||
term_start_lsn: _,
|
||||
begin_lsn: _,
|
||||
end_lsn: _,
|
||||
commit_lsn: _,
|
||||
truncate_lsn: _,
|
||||
proposer_uuid: _,
|
||||
},
|
||||
wal_data,
|
||||
}) => wal_data.len(),
|
||||
|
||||
Self::FlushWAL => 0,
|
||||
};
|
||||
|
||||
size
|
||||
}
|
||||
}
|
||||
|
||||
/// Acceptor -> Proposer messages
|
||||
|
||||
@@ -28,7 +28,6 @@ tokio = { workspace = true, features = ["rt-multi-thread"] }
|
||||
tracing.workspace = true
|
||||
metrics.workspace = true
|
||||
utils.workspace = true
|
||||
rustls.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
|
||||
@@ -52,12 +52,6 @@ where
|
||||
// If schema starts with https, start encrypted connection; do plain text
|
||||
// otherwise.
|
||||
if let Some("https") = tonic_endpoint.uri().scheme_str() {
|
||||
// if there's no default provider and both ring+aws-lc-rs are enabled
|
||||
// this the tls settings on tonic will not work.
|
||||
// erroring is ok.
|
||||
rustls::crypto::ring::default_provider()
|
||||
.install_default()
|
||||
.ok();
|
||||
let tls = ClientTlsConfig::new();
|
||||
tonic_endpoint = tonic_endpoint.tls_config(tls)?;
|
||||
}
|
||||
|
||||
@@ -658,7 +658,7 @@ async fn handle_node_register(req: Request<Body>) -> Result<Response<Body>, ApiE
|
||||
}
|
||||
|
||||
async fn handle_node_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Infra)?;
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
@@ -737,7 +737,7 @@ async fn handle_node_configure(req: Request<Body>) -> Result<Response<Body>, Api
|
||||
}
|
||||
|
||||
async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Infra)?;
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
@@ -786,7 +786,7 @@ async fn handle_get_leader(req: Request<Body>) -> Result<Response<Body>, ApiErro
|
||||
}
|
||||
|
||||
async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Infra)?;
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
@@ -804,7 +804,7 @@ async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiErro
|
||||
}
|
||||
|
||||
async fn handle_cancel_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Infra)?;
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
@@ -822,7 +822,7 @@ async fn handle_cancel_node_drain(req: Request<Body>) -> Result<Response<Body>,
|
||||
}
|
||||
|
||||
async fn handle_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Infra)?;
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
@@ -840,7 +840,7 @@ async fn handle_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError
|
||||
}
|
||||
|
||||
async fn handle_cancel_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Infra)?;
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use pageserver_api::controller_api::ShardSchedulingPolicy;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -48,16 +47,6 @@ impl ChaosInjector {
|
||||
.get_mut(victim)
|
||||
.expect("Held lock between choosing ID and this get");
|
||||
|
||||
if !matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) {
|
||||
// Skip non-active scheduling policies, so that a shard with a policy like Pause can
|
||||
// be pinned without being disrupted by us.
|
||||
tracing::info!(
|
||||
"Skipping shard {victim}: scheduling policy is {:?}",
|
||||
shard.get_scheduling_policy()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Pick a secondary to promote
|
||||
let Some(new_location) = shard
|
||||
.intent
|
||||
@@ -74,8 +63,6 @@ impl ChaosInjector {
|
||||
continue;
|
||||
};
|
||||
|
||||
tracing::info!("Injecting chaos: migrate {victim} {old_location}->{new_location}");
|
||||
|
||||
shard.intent.demote_attached(scheduler, old_location);
|
||||
shard.intent.promote_attached(scheduler, new_location);
|
||||
self.service.maybe_reconcile_shard(shard, nodes);
|
||||
|
||||
@@ -6,7 +6,7 @@ use once_cell::sync::OnceCell;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_ffi::{XLogFileName, PG_TLI};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use rustls::crypto::ring;
|
||||
use rustls::crypto::aws_lc_rs;
|
||||
use serde::Serialize;
|
||||
use tokio_postgres::types::PgLsn;
|
||||
use tracing::{debug, error, info};
|
||||
@@ -256,9 +256,9 @@ async fn load_timelines_from_db(
|
||||
// Use rustls (Neon requires TLS)
|
||||
let root_store = TLS_ROOTS.get_or_try_init(load_certs)?.clone();
|
||||
let client_config =
|
||||
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
rustls::ClientConfig::builder_with_provider(Arc::new(aws_lc_rs::default_provider()))
|
||||
.with_safe_default_protocol_versions()
|
||||
.context("ring should support the default protocol versions")?
|
||||
.context("aws_lc_rs should support the default protocol versions")?
|
||||
.with_root_certificates(root_store)
|
||||
.with_no_client_auth();
|
||||
let tls_connector = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
|
||||
|
||||
@@ -1397,7 +1397,7 @@ def neon_simple_env(
|
||||
pageserver_virtual_file_io_mode: Optional[str],
|
||||
) -> Iterator[NeonEnv]:
|
||||
"""
|
||||
Simple Neon environment, with 1 safekeeper and 1 pageserver. No authentication, no fsync.
|
||||
Simple Neon environment, with no authentication and no safekeepers.
|
||||
|
||||
This fixture will use RemoteStorageKind.LOCAL_FS with pageserver.
|
||||
"""
|
||||
@@ -4701,7 +4701,6 @@ def tenant_get_shards(
|
||||
|
||||
If the caller provides `pageserver_id`, it will be used for all shards, even
|
||||
if the shard is indicated by storage controller to be on some other pageserver.
|
||||
If the storage controller is not running, assume an unsharded tenant.
|
||||
|
||||
Caller should over the response to apply their per-pageserver action to
|
||||
each shard
|
||||
@@ -4711,17 +4710,17 @@ def tenant_get_shards(
|
||||
else:
|
||||
override_pageserver = None
|
||||
|
||||
if not env.storage_controller.running and override_pageserver is not None:
|
||||
log.warning(f"storage controller not running, assuming unsharded tenant {tenant_id}")
|
||||
return [(TenantShardId(tenant_id, 0, 0), override_pageserver)]
|
||||
|
||||
return [
|
||||
(
|
||||
TenantShardId.parse(s["shard_id"]),
|
||||
override_pageserver or env.get_pageserver(s["node_id"]),
|
||||
)
|
||||
for s in env.storage_controller.locate(tenant_id)
|
||||
]
|
||||
if len(env.pageservers) > 1:
|
||||
return [
|
||||
(
|
||||
TenantShardId.parse(s["shard_id"]),
|
||||
override_pageserver or env.get_pageserver(s["node_id"]),
|
||||
)
|
||||
for s in env.storage_controller.locate(tenant_id)
|
||||
]
|
||||
else:
|
||||
# Assume an unsharded tenant
|
||||
return [(TenantShardId(tenant_id, 0, 0), override_pageserver or env.pageserver)]
|
||||
|
||||
|
||||
def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint):
|
||||
|
||||
@@ -404,12 +404,6 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
return res.json()
|
||||
|
||||
def set_tenant_config(self, tenant_id: Union[TenantId, TenantShardId], config: dict[str, Any]):
|
||||
"""
|
||||
Only use this via storage_controller.pageserver_api().
|
||||
|
||||
Storcon is the authority on tenant config - changes you make directly
|
||||
against pageserver may be reconciled away at any time.
|
||||
"""
|
||||
assert "tenant_id" not in config.keys()
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/tenant/config",
|
||||
@@ -423,11 +417,6 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
inserts: Optional[dict[str, Any]] = None,
|
||||
removes: Optional[list[str]] = None,
|
||||
):
|
||||
"""
|
||||
Only use this via storage_controller.pageserver_api().
|
||||
|
||||
See `set_tenant_config` for more information.
|
||||
"""
|
||||
current = self.tenant_config(tenant_id).tenant_specific_overrides
|
||||
if inserts is not None:
|
||||
current.update(inserts)
|
||||
|
||||
@@ -1,71 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from contextlib import closing
|
||||
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.common_types import Lsn, TenantShardId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
tenant_get_shards,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
@pytest.mark.parametrize("shard_count", [1, 8, 32])
|
||||
def test_sharded_ingest(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
shard_count: int,
|
||||
):
|
||||
"""
|
||||
Benchmarks sharded ingestion throughput, by ingesting a large amount of WAL into a Safekeeper
|
||||
and fanning out to a large number of shards on dedicated Pageservers. Comparing the base case
|
||||
(shard_count=1) to the sharded case indicates the overhead of sharding.
|
||||
"""
|
||||
|
||||
ROW_COUNT = 100_000_000 # about 7 GB of WAL
|
||||
|
||||
neon_env_builder.num_pageservers = shard_count
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# Create a sharded tenant and timeline, and migrate it to the respective pageservers. Ensure
|
||||
# the storage controller doesn't mess with shard placements.
|
||||
#
|
||||
# TODO: there should be a way to disable storage controller background reconciliations.
|
||||
# Currently, disabling reconciliation also disables foreground operations.
|
||||
tenant_id, timeline_id = env.create_tenant(shard_count=shard_count)
|
||||
|
||||
for shard_number in range(0, shard_count):
|
||||
tenant_shard_id = TenantShardId(tenant_id, shard_number, shard_count)
|
||||
pageserver_id = shard_number + 1
|
||||
env.storage_controller.tenant_shard_migrate(tenant_shard_id, pageserver_id)
|
||||
|
||||
shards = tenant_get_shards(env, tenant_id)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
assert tenant_get_shards(env, tenant_id) == shards, "shards moved"
|
||||
|
||||
# Start the endpoint.
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
start_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
|
||||
|
||||
# Ingest data and measure WAL volume and duration.
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
log.info("Ingesting data")
|
||||
cur.execute("set statement_timeout = 0")
|
||||
cur.execute("create table huge (i int, j int)")
|
||||
|
||||
with zenbenchmark.record_duration("pageserver_ingest"):
|
||||
with zenbenchmark.record_duration("wal_ingest"):
|
||||
cur.execute(f"insert into huge values (generate_series(1, {ROW_COUNT}), 0)")
|
||||
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
end_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
|
||||
wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024))
|
||||
zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM)
|
||||
|
||||
assert tenant_get_shards(env, tenant_id) == shards, "shards moved"
|
||||
@@ -176,21 +176,17 @@ def test_fully_custom_config(positive_env: NeonEnv):
|
||||
"lsn_lease_length_for_ts": "5s",
|
||||
}
|
||||
|
||||
vps_http = env.storage_controller.pageserver_api()
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
initial_tenant_config = vps_http.tenant_config(env.initial_tenant)
|
||||
assert [
|
||||
(key, val)
|
||||
for key, val in initial_tenant_config.tenant_specific_overrides.items()
|
||||
if val is not None
|
||||
] == []
|
||||
initial_tenant_config = ps_http.tenant_config(env.initial_tenant)
|
||||
assert initial_tenant_config.tenant_specific_overrides == {}
|
||||
assert set(initial_tenant_config.effective_config.keys()) == set(
|
||||
fully_custom_config.keys()
|
||||
), "ensure we cover all config options"
|
||||
|
||||
(tenant_id, _) = env.create_tenant()
|
||||
vps_http.set_tenant_config(tenant_id, fully_custom_config)
|
||||
our_tenant_config = vps_http.tenant_config(tenant_id)
|
||||
ps_http.set_tenant_config(tenant_id, fully_custom_config)
|
||||
our_tenant_config = ps_http.tenant_config(tenant_id)
|
||||
assert our_tenant_config.tenant_specific_overrides == fully_custom_config
|
||||
assert set(our_tenant_config.effective_config.keys()) == set(
|
||||
fully_custom_config.keys()
|
||||
@@ -203,10 +199,10 @@ def test_fully_custom_config(positive_env: NeonEnv):
|
||||
== {k: True for k in fully_custom_config.keys()}
|
||||
), "ensure our custom config has different values than the default config for all config options, so we know we overrode everything"
|
||||
|
||||
env.pageserver.tenant_detach(tenant_id)
|
||||
ps_http.tenant_detach(tenant_id)
|
||||
env.pageserver.tenant_attach(tenant_id, config=fully_custom_config)
|
||||
|
||||
assert vps_http.tenant_config(tenant_id).tenant_specific_overrides == fully_custom_config
|
||||
assert set(vps_http.tenant_config(tenant_id).effective_config.keys()) == set(
|
||||
assert ps_http.tenant_config(tenant_id).tenant_specific_overrides == fully_custom_config
|
||||
assert set(ps_http.tenant_config(tenant_id).effective_config.keys()) == set(
|
||||
fully_custom_config.keys()
|
||||
), "ensure we cover all config options"
|
||||
|
||||
@@ -103,6 +103,7 @@ def test_timeline_init_break_before_checkpoint(neon_env_builder: NeonEnvBuilder)
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*Failed to process timeline dir contents.*Timeline has no ancestor and no layer files.*",
|
||||
".*Timeline got dropped without initializing, cleaning its files.*",
|
||||
]
|
||||
)
|
||||
|
||||
@@ -144,6 +145,7 @@ def test_timeline_init_break_before_checkpoint_recreate(
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*Failed to process timeline dir contents.*Timeline has no ancestor and no layer files.*",
|
||||
".*Timeline got dropped without initializing, cleaning its files.*",
|
||||
".*Failed to load index_part from remote storage, failed creation?.*",
|
||||
]
|
||||
)
|
||||
|
||||
@@ -38,24 +38,21 @@ def test_min_resident_size_override_handling(
|
||||
neon_env_builder: NeonEnvBuilder, config_level_override: int
|
||||
):
|
||||
env = neon_env_builder.init_start()
|
||||
vps_http = env.storage_controller.pageserver_api()
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
def assert_config(tenant_id, expect_override, expect_effective):
|
||||
# talk to actual pageserver to _get_ the config, workaround for
|
||||
# https://github.com/neondatabase/neon/issues/9621
|
||||
config = ps_http.tenant_config(tenant_id)
|
||||
assert config.tenant_specific_overrides.get("min_resident_size_override") == expect_override
|
||||
assert config.effective_config.get("min_resident_size_override") == expect_effective
|
||||
|
||||
def assert_overrides(tenant_id, default_tenant_conf_value):
|
||||
vps_http.set_tenant_config(tenant_id, {"min_resident_size_override": 200})
|
||||
ps_http.set_tenant_config(tenant_id, {"min_resident_size_override": 200})
|
||||
assert_config(tenant_id, 200, 200)
|
||||
|
||||
vps_http.set_tenant_config(tenant_id, {"min_resident_size_override": 0})
|
||||
ps_http.set_tenant_config(tenant_id, {"min_resident_size_override": 0})
|
||||
assert_config(tenant_id, 0, 0)
|
||||
|
||||
vps_http.set_tenant_config(tenant_id, {})
|
||||
ps_http.set_tenant_config(tenant_id, {})
|
||||
assert_config(tenant_id, None, default_tenant_conf_value)
|
||||
|
||||
if config_level_override is not None:
|
||||
@@ -75,7 +72,7 @@ def test_min_resident_size_override_handling(
|
||||
# Also ensure that specifying the paramter to create_tenant works, in addition to http-level recconfig.
|
||||
tenant_id, _ = env.create_tenant(conf={"min_resident_size_override": "100"})
|
||||
assert_config(tenant_id, 100, 100)
|
||||
vps_http.set_tenant_config(tenant_id, {})
|
||||
ps_http.set_tenant_config(tenant_id, {})
|
||||
assert_config(tenant_id, None, config_level_override)
|
||||
|
||||
|
||||
@@ -460,10 +457,10 @@ def test_pageserver_respects_overridden_resident_size(
|
||||
assert (
|
||||
du_by_timeline[large_tenant] > min_resident_size
|
||||
), "ensure the larger tenant will get a haircut"
|
||||
env.neon_env.storage_controller.pageserver_api().patch_tenant_config_client_side(
|
||||
ps_http.patch_tenant_config_client_side(
|
||||
small_tenant[0], {"min_resident_size_override": min_resident_size}
|
||||
)
|
||||
env.neon_env.storage_controller.pageserver_api().patch_tenant_config_client_side(
|
||||
ps_http.patch_tenant_config_client_side(
|
||||
large_tenant[0], {"min_resident_size_override": min_resident_size}
|
||||
)
|
||||
|
||||
|
||||
@@ -91,6 +91,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
[
|
||||
".*Failed to import basebackup.*",
|
||||
".*unexpected non-zero bytes after the tar archive.*",
|
||||
".*Timeline got dropped without initializing, cleaning its files.*",
|
||||
".*InternalServerError.*timeline not found.*",
|
||||
".*InternalServerError.*Tenant .* not found.*",
|
||||
".*InternalServerError.*Timeline .* not found.*",
|
||||
|
||||
@@ -81,7 +81,7 @@ def test_ingesting_large_batches_of_images(neon_env_builder: NeonEnvBuilder, bui
|
||||
print_layer_size_histogram(post_ingest)
|
||||
|
||||
# since all we have are L0s, we should be getting nice L1s and images out of them now
|
||||
env.storage_controller.pageserver_api().patch_tenant_config_client_side(
|
||||
ps_http.patch_tenant_config_client_side(
|
||||
env.initial_tenant,
|
||||
{
|
||||
"compaction_threshold": 1,
|
||||
|
||||
@@ -127,7 +127,7 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
|
||||
), "sanity check for what above loop is supposed to do"
|
||||
|
||||
# create the image layer from the future
|
||||
env.storage_controller.pageserver_api().patch_tenant_config_client_side(
|
||||
ps_http.patch_tenant_config_client_side(
|
||||
tenant_id, {"image_creation_threshold": image_creation_threshold}, None
|
||||
)
|
||||
assert ps_http.tenant_config(tenant_id).effective_config["image_creation_threshold"] == 1
|
||||
|
||||
@@ -46,9 +46,7 @@ def test_local_only_layers_after_crash(neon_env_builder: NeonEnvBuilder, pg_bin:
|
||||
for sk in env.safekeepers:
|
||||
sk.stop()
|
||||
|
||||
env.storage_controller.pageserver_api().patch_tenant_config_client_side(
|
||||
tenant_id, {"compaction_threshold": 3}
|
||||
)
|
||||
pageserver_http.patch_tenant_config_client_side(tenant_id, {"compaction_threshold": 3})
|
||||
# hit the exit failpoint
|
||||
with pytest.raises(ConnectionError, match="Remote end closed connection without response"):
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
@@ -146,13 +146,13 @@ def test_throttle_fair_config_is_settable_but_ignored_in_mgmt_api(neon_env_build
|
||||
To be removed after https://github.com/neondatabase/neon/pull/8539 is rolled out.
|
||||
"""
|
||||
env = neon_env_builder.init_start()
|
||||
vps_http = env.storage_controller.pageserver_api()
|
||||
ps_http = env.pageserver.http_client()
|
||||
# with_fair config should still be settable
|
||||
vps_http.set_tenant_config(
|
||||
ps_http.set_tenant_config(
|
||||
env.initial_tenant,
|
||||
{"timeline_get_throttle": throttle_config_with_field_fair_set},
|
||||
)
|
||||
conf = vps_http.tenant_config(env.initial_tenant)
|
||||
conf = ps_http.tenant_config(env.initial_tenant)
|
||||
assert_throttle_config_with_field_fair_set(conf.effective_config["timeline_get_throttle"])
|
||||
assert_throttle_config_with_field_fair_set(
|
||||
conf.tenant_specific_overrides["timeline_get_throttle"]
|
||||
|
||||
@@ -52,9 +52,7 @@ def test_tenant_s3_restore(
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
# now lets create the small layers
|
||||
env.storage_controller.pageserver_api().set_tenant_config(
|
||||
tenant_id, many_small_layers_tenant_config()
|
||||
)
|
||||
ps_http.set_tenant_config(tenant_id, many_small_layers_tenant_config())
|
||||
|
||||
# Default tenant and the one we created
|
||||
assert ps_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "attached"}) == 1
|
||||
|
||||
@@ -146,6 +146,8 @@ def test_long_timeline_create_cancelled_by_tenant_delete(neon_env_builder: NeonE
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
# happens with the cancellation bailing flushing loop earlier, leaving disk_consistent_lsn at zero
|
||||
".*Timeline got dropped without initializing, cleaning its files",
|
||||
# the response hit_pausable_failpoint_and_later_fail
|
||||
f".*Error processing HTTP request: InternalServerError\\(new timeline {env.initial_tenant}/{env.initial_timeline} has invalid disk_consistent_lsn",
|
||||
]
|
||||
|
||||
@@ -213,13 +213,6 @@ def test_timeline_offloading(neon_env_builder: NeonEnvBuilder, manual_offload: b
|
||||
wait_until(30, 1, leaf_offloaded)
|
||||
wait_until(30, 1, parent_offloaded)
|
||||
|
||||
# Offloaded child timelines should still prevent deletion
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match=f".* timeline which has child timelines: \\[{leaf_timeline_id}\\]",
|
||||
):
|
||||
ps_http.timeline_delete(tenant_id, parent_timeline_id)
|
||||
|
||||
ps_http.timeline_archival_config(
|
||||
tenant_id,
|
||||
grandparent_timeline_id,
|
||||
|
||||
@@ -511,7 +511,7 @@ def test_compaction_induced_by_detaches_in_history(
|
||||
|
||||
assert len(delta_layers(branch_timeline_id)) == 5
|
||||
|
||||
env.storage_controller.pageserver_api().patch_tenant_config_client_side(
|
||||
client.patch_tenant_config_client_side(
|
||||
env.initial_tenant, {"compaction_threshold": 5}, None
|
||||
)
|
||||
|
||||
|
||||
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: 9ad2f3c5c3...68b5038f27
2
vendor/revisions.json
vendored
2
vendor/revisions.json
vendored
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.0",
|
||||
"9ad2f3c5c37c08069a01c1e3f6b7cf275437e0cb"
|
||||
"68b5038f27e493bde6ae552fe066f10cbdfe6a14"
|
||||
],
|
||||
"v16": [
|
||||
"16.4",
|
||||
|
||||
@@ -47,7 +47,7 @@ hyper-dff4ba8e3ae991db = { package = "hyper", version = "1", features = ["full"]
|
||||
hyper-util = { version = "0.1", features = ["client-legacy", "server-auto", "service"] }
|
||||
indexmap-dff4ba8e3ae991db = { package = "indexmap", version = "1", default-features = false, features = ["std"] }
|
||||
indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] }
|
||||
itertools = { version = "0.12" }
|
||||
itertools = { version = "0.10" }
|
||||
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
|
||||
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
@@ -65,7 +65,8 @@ regex = { version = "1" }
|
||||
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
|
||||
regex-syntax = { version = "0.8" }
|
||||
reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls", "stream"] }
|
||||
rustls = { version = "0.23", default-features = false, features = ["logging", "ring", "std", "tls12"] }
|
||||
rustls = { version = "0.23", features = ["ring"] }
|
||||
rustls-webpki = { version = "0.102", default-features = false, features = ["aws_lc_rs", "ring", "std"] }
|
||||
scopeguard = { version = "1" }
|
||||
serde = { version = "1", features = ["alloc", "derive"] }
|
||||
serde_json = { version = "1", features = ["alloc", "raw_value"] }
|
||||
@@ -79,7 +80,7 @@ tikv-jemalloc-sys = { version = "0.5" }
|
||||
time = { version = "0.3", features = ["macros", "serde-well-known"] }
|
||||
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2", features = ["with-serde_json-1"] }
|
||||
tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] }
|
||||
tokio-rustls = { version = "0.26", features = ["ring"] }
|
||||
tokio-stream = { version = "0.1", features = ["net"] }
|
||||
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
|
||||
toml_edit = { version = "0.22", features = ["serde"] }
|
||||
@@ -105,7 +106,7 @@ half = { version = "2", default-features = false, features = ["num-traits"] }
|
||||
hashbrown = { version = "0.14", features = ["raw"] }
|
||||
indexmap-dff4ba8e3ae991db = { package = "indexmap", version = "1", default-features = false, features = ["std"] }
|
||||
indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] }
|
||||
itertools = { version = "0.12" }
|
||||
itertools = { version = "0.10" }
|
||||
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
memchr = { version = "2" }
|
||||
|
||||
Reference in New Issue
Block a user