Compare commits

..

48 Commits

Author SHA1 Message Date
Alexander Bayandin
d884af4fb9 Test weird parameters 2022-09-30 14:18:36 +03:00
Alexander Bayandin
d98c53174e Test params 2022-09-30 12:50:48 +03:00
Arthur Petukhovsky
dabb6d2675 Fix log level for sk startup logs (#2526) 2022-09-27 10:36:17 +00:00
Arthur Petukhovsky
fc7087b16f Add metric for loaded safekeeper timelines (#2509) 2022-09-27 11:57:59 +03:00
Vadim Kharitonov
2233ca2a39 seqwait.rs unit tests don't check return value 2022-09-27 11:47:59 +03:00
Dmitry Rodionov
fb68d01449 Preserve task result in TaskHandle by keeping join handle around (#2521)
* Preserve task result in TaskHandle by keeping join handle around

The solution is not great, but it should hep to debug staging issue
I tried to do it in a least destructive way. TaskHandle used only in
one place so it is ok to use something less generic unless we want
to extend its usage across the codebase. In its current current form
for its single usage place it looks too abstract

Some problems around this code:
1. Task can drop event sender and continue running
2. Task cannot be joined several times (probably not needed,
    but still, can be surprising)
3. Had to split task event into two types because ahyhow::Error
    does not implement clone. So TaskContinueEvent derives clone
    but usual task evend does not. Clone requirement appears
    because we clone the current value in next_task_event.
    Taking it by reference is complicated.
4. Split between Init and Started is artificial and comes from
    watch::channel requirement to have some initial value.

    To summarize from 3 and 4. It may be a better idea to use
    RWLock or a bounded channel instead
2022-09-26 20:57:02 +00:00
Arthur Petukhovsky
d15116f2cc Update pg_version for old timelines 2022-09-26 19:57:03 +03:00
Stas Kelvich
df45c0d0e5 Disable plv8 again 2022-09-26 12:22:46 +03:00
Stas Kelvich
367cc01290 Fix deploy paths 2022-09-26 10:08:01 +03:00
Anastasia Lubennikova
1165686201 fix deploy lib paths for postgres 2022-09-23 22:23:43 +03:00
Anastasia Lubennikova
093264a695 Fix deploy bin and lib paths for postgres 2022-09-23 22:23:43 +03:00
sharnoff
805bb198c2 Miscellaneous small fixups (#2503)
Changes are:

 * Correct typo "firts" -> "first"
 * Change <empty panic with comment explaining> to <panic with message
   taken from the comment>
 * Fix weird indentation that rustfmt was failing to handle
 * Use existing `anyhow::{anyhow,bail}!` as `{anyhow,bail}!` if it's
   already in scope
 * Spell `Result<T, anyhow::Error>` as `anyhow::Result<T>`
   * In general, closer to matching the rest of the codebase
 * Change usages of `hash_map::Entry` to `Entry` when it's already in
   scope
   * A quick search shows our style on this one varies across the files
     it's used in
2022-09-23 11:49:28 -07:00
Stas Kelvich
5ccd54c699 Add support for h3-pg and re-enable plv8 2022-09-23 19:06:57 +03:00
Dmitry Ivanov
1dffba9de6 Write more tests for the proxy... (#1918)
And change a few more things in the process.
2022-09-23 18:30:44 +03:00
Alexander Bayandin
ebab89ebd2 test_runner: pass password to pgbench via PGPASSWORD (#2468) 2022-09-23 12:51:33 +00:00
MMeent
bc3ba23e0a Fix extreme metrics bloat in storage sync (#2506)
* Fix extreme metrics bloat in storage sync

From 78 metrics per (timeline, tenant) pair down to (max) 10 metrics per
(timeline, tenant) pair, plus another 117 metrics in a global histogram that
replaces the previous per-timeline histogram.

* Drop image sync operation metric series when dropping TimelineMetrics.
2022-09-23 14:35:36 +02:00
Alexander Bayandin
3e65209a06 Nightly Benchmarks: use Postgres binaries from artifacts (#2501) 2022-09-23 12:50:36 +01:00
Dmitry Rodionov
eb0c6bcf1a reenable storage deployments 2022-09-23 14:12:39 +03:00
Rory de Zoete
52819898e4 Extend image push step with production ECR (#2465)
* Extend image push step with production ECR

* Put copy step before auth change

* Use correct name

* Only push on main

* Fix typo
2022-09-23 11:25:29 +02:00
Sergey Melnikov
b0377f750a Add staging-test region to normal staging rollouts (#2500) 2022-09-23 11:25:26 +03:00
Dmitry Rodionov
43560506c0 remove duplicate walreceiver connection span 2022-09-23 00:27:24 +03:00
Anastasia Lubennikova
c81ede8644 Hotfix for safekeeper timelines with unknown pg_version.
Assume DEFAULT_PG_VERSION = 14
2022-09-22 21:17:36 +03:00
Anastasia Lubennikova
eb9200abc8 Use version-specific path in pytest CI script 2022-09-22 18:12:41 +03:00
Anastasia Lubennikova
7c1695e87d fix psql path in export_import_between_pageservers script 2022-09-22 18:12:41 +03:00
Anastasia Lubennikova
8b42c184e7 Update LD_LIBRARY_PATH in deploy scripts 2022-09-22 18:12:41 +03:00
Anastasia Lubennikova
7138db9279 Fix paths to postgres binaries in the deploy script 2022-09-22 18:12:41 +03:00
Egor Suvorov
262fa3be09 pageserver pg proto: add missing auth checks (#2494)
Fixes #1858
2022-09-22 17:07:08 +03:00
Anastasia Lubennikova
5e151192f5 Fix rebase conflicts in safekeeper code 2022-09-22 14:15:13 +03:00
Anastasia Lubennikova
2d012f0d32 Fix rebase conflicts in pageserver code 2022-09-22 14:15:13 +03:00
Anastasia Lubennikova
64f64d5637 Fix after rebase: bump vendor/postgres-v14 to match main 2022-09-22 14:15:13 +03:00
Anastasia Lubennikova
1fa7d6aebf Use DEFAULT_PG_VERSION env in CI pytest 2022-09-22 14:15:13 +03:00
Anastasia Lubennikova
d098542dde Make test_timeline_size_metrics more stable:
Compare size with Vanilla postgres size instead of hardcoded value
2022-09-22 14:15:13 +03:00
Anastasia Lubennikova
eba419fda3 Clean up the pg_version choice code 2022-09-22 14:15:13 +03:00
Anastasia Lubennikova
d8d3cd49f4 Update libs/postgres_ffi/wal_craft/src/bin/wal_craft.rs
Co-authored-by: MMeent <matthias@neon.tech>
2022-09-22 14:15:13 +03:00
Anastasia Lubennikova
3618c242b9 use version specific find_end_of_wal function 2022-09-22 14:15:13 +03:00
Anastasia Lubennikova
ed6b75e301 show pg_version in create_timeline info span 2022-09-22 14:15:13 +03:00
Anastasia Lubennikova
862902f9e5 Update readme and openapi spec 2022-09-22 14:15:13 +03:00
Anastasia Lubennikova
8d890b3cbb fix clippy warnings 2022-09-22 14:15:13 +03:00
Anastasia Lubennikova
0fde59aa46 use pg_version in python tests 2022-09-22 14:15:13 +03:00
Anastasia Lubennikova
1255ef806f pass version to wal_craft.rs 2022-09-22 14:15:13 +03:00
Anastasia Lubennikova
5dddeb8d88 Use non-versioned pg_distrib dir 2022-09-22 14:15:13 +03:00
Anastasia Lubennikova
d45de3d58f update build scripts to match pg_distrib_dir versioning schema 2022-09-22 14:15:13 +03:00
Anastasia Lubennikova
a69e060f0f fix clippy warning 2022-09-22 14:15:13 +03:00
Anastasia Lubennikova
a4397d43e9 Rename waldecoder -> waldecoder_handler.rs. Add comments 2022-09-22 14:15:13 +03:00
Anastasia Lubennikova
03c606f7c5 Pass pg_version parameter to timeline import command.
Add pg_version field to LocalTimelineInfo.
Use pg_version in the export_import_between_pageservers script
2022-09-22 14:15:13 +03:00
Anastasia Lubennikova
9dfede8146 Handle backwards-compatibility of TimelineMetadata.
This commit bumps TimelineMetadata format version and makes it independent from STORAGE_FORMAT_VERSION.
2022-09-22 14:15:13 +03:00
Anastasia Lubennikova
86bf491981 Support pg 15
- Split postgres_ffi into two version specific files.
- Preserve pg_version in timeline metadata.
- Use pg_version in safekeeper code. Check for postgres major version mismatch.
- Clean up the code to use DEFAULT_PG_VERSION constant everywhere, instead of hardcoding.

-  Parameterize python tests: use DEFAULT_PG_VERSION env and pg_version fixture.
   To run tests using a specific PostgreSQL version, pass the DEFAULT_PG_VERSION environment variable:
   'DEFAULT_PG_VERSION='15' ./scripts/pytest test_runner/regress'
 Currently don't all tests pass, because rust code relies on the default version of PostgreSQL in a few places.
2022-09-22 14:15:13 +03:00
Dmitry Rodionov
e764c1e60f remove self argument from several spans 2022-09-22 11:41:04 +03:00
37 changed files with 573 additions and 276 deletions

View File

@@ -12,6 +12,9 @@ inputs:
description: "Allow to skip if file doesn't exist, fail otherwise"
default: false
required: false
prefix:
description: "S3 prefix. Default is '${GITHUB_RUN_ID}/${GITHUB_RUN_ATTEMPT}'"
required: false
runs:
using: "composite"
@@ -23,18 +26,18 @@ runs:
TARGET: ${{ inputs.path }}
ARCHIVE: /tmp/downloads/${{ inputs.name }}.tar.zst
SKIP_IF_DOES_NOT_EXIST: ${{ inputs.skip-if-does-not-exist }}
PREFIX: artifacts/${{ inputs.prefix || format('{0}/{1}', github.run_id, github.run_attempt) }}
run: |
BUCKET=neon-github-public-dev
PREFIX=artifacts/${GITHUB_RUN_ID}
FILENAME=$(basename $ARCHIVE)
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${PREFIX} | jq -r '.Contents[].Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${PREFIX%$GITHUB_RUN_ATTEMPT} | jq -r '.Contents[].Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
if [ -z "${S3_KEY}" ]; then
if [ "${SKIP_IF_DOES_NOT_EXIST}" = "true" ]; then
echo '::set-output name=SKIPPED::true'
exit 0
else
echo 2>&1 "Neither s3://${BUCKET}/${PREFIX}/${GITHUB_RUN_ATTEMPT}/${FILENAME} nor its version from previous attempts exist"
echo 2>&1 "Neither s3://${BUCKET}/${PREFIX}/${FILENAME} nor its version from previous attempts exist"
exit 1
fi
fi

View File

@@ -63,7 +63,39 @@ runs:
\"name\": \"Created by actions/neon-project-create; GITHUB_RUN_ID=${GITHUB_RUN_ID}\",
\"platform_id\": \"aws\",
\"region_id\": \"${REGION_ID}\",
\"settings\": { }
\"settings\": {
\"constraint_exclusion\": \"off\",
\"cpu_index_tuple_cost\": \"1.637272286893718e+308\",
\"cpu_operator_cost\": \"1.79769e+308\",
\"cpu_tuple_cost\": \"1.6957485653186957e+308\",
\"cursor_tuple_fraction\": \"0.4685070675008615\",
\"enable_async_append\": \"off\",
\"enable_bitmapscan\": \"on\",
\"enable_gathermerge\": \"on\",
\"enable_hashagg\": \"on\",
\"enable_hashjoin\": \"off\",
\"enable_incremental_sort\": \"on\",
\"enable_indexonlyscan\": \"on\",
\"enable_indexscan\": \"on\",
\"enable_material\": \"off\",
\"enable_mergejoin\": \"off\",
\"enable_nestloop\": \"off\",
\"enable_parallel_append\": \"off\",
\"enable_parallel_hash\": \"off\",
\"enable_partition_pruning\": \"on\",
\"enable_partitionwise_aggregate\": \"off\",
\"enable_partitionwise_join\": \"on\",
\"enable_seqscan\": \"on\",
\"enable_sort\": \"off\",
\"enable_tidscan\": \"off\",
\"from_collapse_limit\": \"1594910280\",
\"min_parallel_table_scan_size\": \"0\",
\"parallel_setup_cost\": \"1.79769e+308\",
\"parallel_tuple_cost\": \"6.288448285557434e+307\",
\"plan_cache_mode\": \"force_custom_plan\",
\"random_page_cost\": \"2.468072863935744e+307\",
\"seq_page_cost\": \"0.0\"
}
}
}")

View File

@@ -127,7 +127,7 @@ runs:
# Wake up the cluster if we use remote neon instance
if [ "${{ inputs.build_type }}" = "remote" ] && [ -n "${BENCHMARK_CONNSTR}" ]; then
${POSTGRES_DISTRIB_DIR}/v14/bin/psql ${BENCHMARK_CONNSTR} -c "SELECT version();"
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/psql ${BENCHMARK_CONNSTR} -c "SELECT version();"
fi
# Run the tests.

View File

@@ -7,6 +7,9 @@ inputs:
path:
description: "A directory or file to upload"
required: true
prefix:
description: "S3 prefix. Default is '${GITHUB_RUN_ID}/${GITHUB_RUN_ATTEMPT}'"
required: false
runs:
using: "composite"
@@ -42,14 +45,14 @@ runs:
env:
SOURCE: ${{ inputs.path }}
ARCHIVE: /tmp/uploads/${{ inputs.name }}.tar.zst
PREFIX: artifacts/${{ inputs.prefix || format('{0}/{1}', github.run_id, github.run_attempt) }}
run: |
BUCKET=neon-github-public-dev
PREFIX=artifacts/${GITHUB_RUN_ID}
FILENAME=$(basename $ARCHIVE)
FILESIZE=$(du -sh ${ARCHIVE} | cut -f1)
time aws s3 mv --only-show-errors ${ARCHIVE} s3://${BUCKET}/${PREFIX}/${GITHUB_RUN_ATTEMPT}/${FILENAME}
time aws s3 mv --only-show-errors ${ARCHIVE} s3://${BUCKET}/${PREFIX}/${FILENAME}
# Ref https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#adding-a-job-summary
echo "[${FILENAME}](https://${BUCKET}.s3.amazonaws.com/${PREFIX}/${GITHUB_RUN_ATTEMPT}/${FILENAME}) ${FILESIZE}" >> ${GITHUB_STEP_SUMMARY}
echo "[${FILENAME}](https://${BUCKET}.s3.amazonaws.com/${PREFIX}/${FILENAME}) ${FILESIZE}" >> ${GITHUB_STEP_SUMMARY}

View File

@@ -58,23 +58,23 @@
creates: "/storage/pageserver/data/tenants"
environment:
NEON_REPO_DIR: "/storage/pageserver/data"
LD_LIBRARY_PATH: "/usr/local/lib"
LD_LIBRARY_PATH: "/usr/local/v14/lib"
become: true
tags:
- pageserver
# - name: update remote storage (s3) config
# lineinfile:
# path: /storage/pageserver/data/pageserver.toml
# line: "{{ item }}"
# loop:
# - "[remote_storage]"
# - "bucket_name = '{{ bucket_name }}'"
# - "bucket_region = '{{ bucket_region }}'"
# - "prefix_in_bucket = '{{ inventory_hostname }}'"
# become: true
# tags:
# - pageserver
- name: update remote storage (s3) config
lineinfile:
path: /storage/pageserver/data/pageserver.toml
line: "{{ item }}"
loop:
- "[remote_storage]"
- "bucket_name = '{{ bucket_name }}'"
- "bucket_region = '{{ bucket_region }}'"
- "prefix_in_bucket = '{{ inventory_hostname }}'"
become: true
tags:
- pageserver
- name: upload systemd service definition
ansible.builtin.template:
@@ -87,15 +87,15 @@
tags:
- pageserver
# - name: start systemd service
# ansible.builtin.systemd:
# daemon_reload: yes
# name: pageserver
# enabled: yes
# state: restarted
# become: true
# tags:
# - pageserver
- name: start systemd service
ansible.builtin.systemd:
daemon_reload: yes
name: pageserver
enabled: yes
state: restarted
become: true
tags:
- pageserver
- name: post version to console
when: console_mgmt_base_url is defined
@@ -132,7 +132,7 @@
creates: "/storage/safekeeper/data/safekeeper.id"
environment:
NEON_REPO_DIR: "/storage/safekeeper/data"
LD_LIBRARY_PATH: "/usr/local/lib"
LD_LIBRARY_PATH: "/usr/local/v14/lib"
become: true
tags:
- safekeeper

View File

@@ -21,10 +21,14 @@ docker pull --quiet neondatabase/neon:${DOCKER_TAG}
ID=$(docker create neondatabase/neon:${DOCKER_TAG})
docker cp ${ID}:/data/postgres_install.tar.gz .
tar -xzf postgres_install.tar.gz -C neon_install
mkdir neon_install/bin/
docker cp ${ID}:/usr/local/bin/pageserver neon_install/bin/
docker cp ${ID}:/usr/local/bin/safekeeper neon_install/bin/
docker cp ${ID}:/usr/local/bin/proxy neon_install/bin/
docker cp ${ID}:/usr/local/bin/postgres neon_install/bin/
docker cp ${ID}:/usr/local/v14/bin/ neon_install/v14/bin/
docker cp ${ID}:/usr/local/v15/bin/ neon_install/v15/bin/
docker cp ${ID}:/usr/local/v14/lib/ neon_install/v14/lib/
docker cp ${ID}:/usr/local/v15/lib/ neon_install/v15/lib/
docker rm -vf ${ID}
# store version to file (for ansible playbooks) and create binaries tarball

View File

@@ -3,11 +3,15 @@
zenith-us-stage-ps-2 console_region_id=27
zenith-us-stage-ps-3 console_region_id=27
zenith-us-stage-ps-4 console_region_id=27
zenith-us-stage-test-ps-1 console_region_id=28
[safekeepers]
zenith-us-stage-sk-4 console_region_id=27
zenith-us-stage-sk-5 console_region_id=27
zenith-us-stage-sk-6 console_region_id=27
zenith-us-stage-test-sk-1 console_region_id=28
zenith-us-stage-test-sk-2 console_region_id=28
zenith-us-stage-test-sk-3 console_region_id=28
[storage:children]
pageservers

View File

@@ -5,7 +5,7 @@ After=network.target auditd.service
[Service]
Type=simple
User=pageserver
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/pageserver LD_LIBRARY_PATH=/usr/local/lib
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/pageserver LD_LIBRARY_PATH=/usr/local/v14/lib
ExecStart=/usr/local/bin/pageserver -c "pg_distrib_dir='/usr/local'" -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -c "broker_endpoints=['{{ etcd_endpoints }}']" -D /storage/pageserver/data
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed

View File

@@ -5,7 +5,7 @@ After=network.target auditd.service
[Service]
Type=simple
User=safekeeper
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/lib
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/v14/lib
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="{{ env_name }}/wal"}'
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed

View File

@@ -46,7 +46,8 @@ jobs:
runs-on: [self-hosted, zenith-benchmarker]
env:
POSTGRES_DISTRIB_DIR: "/usr/pgsql-14"
POSTGRES_DISTRIB_DIR: /tmp/pg_install
DEFAULT_PG_VERSION: 14
steps:
- name: Checkout zenith repo
@@ -71,7 +72,7 @@ jobs:
echo Poetry
poetry --version
echo Pgbench
$POSTGRES_DISTRIB_DIR/bin/pgbench --version
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
- name: Create Neon Project
id: create-neon-project
@@ -140,7 +141,8 @@ jobs:
env:
TEST_PG_BENCH_DURATIONS_MATRIX: "60m"
TEST_PG_BENCH_SCALES_MATRIX: "10gb"
POSTGRES_DISTRIB_DIR: /usr
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 14
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref == 'refs/heads/main' ) }}
@@ -151,7 +153,7 @@ jobs:
# neon-captest-new: Run pgbench in a freshly created project
# neon-captest-reuse: Same, but reusing existing project
# neon-captest-prefetch: Same, with prefetching enabled (new project)
platform: [ neon-captest-new, neon-captest-reuse, neon-captest-prefetch, rds-aurora ]
platform: [ neon-captest-prefetch ]
runs-on: dev
container:
@@ -163,10 +165,17 @@ jobs:
steps:
- uses: actions/checkout@v3
- name: Install Deps
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-release-artifact
path: /tmp/neon/
prefix: latest
- name: Add Postgres binaries to PATH
run: |
sudo apt -y update
sudo apt install -y postgresql-14
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
- name: Create Neon Project
if: matrix.platform != 'neon-captest-reuse'
@@ -204,8 +213,24 @@ jobs:
- name: Set database options
if: matrix.platform == 'neon-captest-prefetch'
run: |
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET geqo_seed=1.0"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET geqo_effort=10"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET seqscan_prefetch_buffers=647"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET default_statistics_target=1"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET jit_above_cost=-1.0"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET join_collapse_limit=1"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET geqo_selection_bias=2.0"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET geqo_pool_size=1574739104"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET geqo_generations=2147483647"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET enable_memoize=off"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET jit_inline_above_cost=-1.0"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET min_parallel_index_scan_size=0"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET geqo_threshold=2"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET enable_seqscan_prefetch=on"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET seqscan_prefetch_buffers=10"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET jit=off"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET jit_optimize_above_cost=1.79769e+308"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET geqo=on"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET effective_cache_size=872767456"
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}

View File

@@ -268,6 +268,32 @@ jobs:
if: matrix.build_type == 'debug'
uses: ./.github/actions/save-coverage-data
upload-latest-artifacts:
runs-on: dev
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
needs: [ regress-tests ]
if: github.ref_name == 'main'
steps:
- name: Copy Neon artifact to the latest directory
shell: bash -euxo pipefail {0}
env:
BUCKET: neon-github-public-dev
PREFIX: artifacts/${{ github.run_id }}
run: |
for build_type in debug release; do
FILENAME=neon-${{ runner.os }}-${build_type}-artifact.tar.zst
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${PREFIX} | jq -r '.Contents[].Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
if [ -z "${S3_KEY}" ]; then
echo 2>&1 "Neither s3://${BUCKET}/${PREFIX}/${FILENAME} nor its version from previous attempts exist"
exit 1
fi
time aws s3 cp --only-show-errors s3://${BUCKET}/${S3_KEY} s3://${BUCKET}/artifacts/latest/${FILENAME}
done
benchmarks:
runs-on: dev
container:
@@ -335,9 +361,6 @@ jobs:
curl --fail --output suites.json ${REPORT_URL%/index.html}/data/suites.json
./scripts/pysync
# Workaround for https://github.com/neondatabase/cloud/issues/2188
psql "$TEST_RESULT_CONNSTR" -c "SELECT 1;" || sleep 10
DATABASE_URL="$TEST_RESULT_CONNSTR" poetry run python3 scripts/ingest_regress_test_result.py --revision ${SHA} --reference ${GITHUB_REF} --build-type ${BUILD_TYPE} --ingest suites.json
coverage-report:
@@ -588,7 +611,16 @@ jobs:
- name: Pull rust image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned rust
- name: Configure docker login
- name: Push images to production ECR
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
run: |
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/neon:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-tools:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-node:latest
- name: Configure Docker Hub login
run: |
# ECR Credential Helper & Docker Hub don't work together in config, hence reset
echo "" > /github/home/.docker/config.json
@@ -609,7 +641,7 @@ jobs:
- name: Push rust image to Docker Hub
run: crane push rust neondatabase/rust:pinned
- name: Add latest tag to images
- name: Add latest tag to images in Docker Hub
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'

View File

@@ -19,9 +19,8 @@ COPY --chown=nonroot scripts/ninstall.sh scripts/ninstall.sh
ENV BUILD_TYPE release
RUN set -e \
&& mold -run make -j $(nproc) -s neon-pg-ext \
&& rm -rf pg_install/v14/build \
&& rm -rf pg_install/v15/build \
&& tar -C pg_install/v14 -czf /home/nonroot/postgres_install.tar.gz .
&& rm -rf pg_install/build \
&& tar -C pg_install -czf /home/nonroot/postgres_install.tar.gz .
# Build neon binaries
FROM $REPOSITORY/$IMAGE:$TAG AS build

View File

@@ -8,9 +8,12 @@ ARG TAG=pinned
# Layer "build-deps"
#
FROM debian:bullseye-slim AS build-deps
RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
apt update
RUN apt update && \
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev zlib1g-dev libxml2-dev \
libcurl4-openssl-dev libossp-uuid-dev
libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libglib2.0-dev
#
# Layer "pg-build"
@@ -37,7 +40,7 @@ RUN cd postgres && \
FROM build-deps AS postgis-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt update && \
apt install -y gdal-bin libgdal-dev libprotobuf-c-dev protobuf-c-compiler xsltproc wget
apt install -y gdal-bin libgdal-dev libprotobuf-c-dev protobuf-c-compiler xsltproc
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.0.tar.gz && \
tar xvzf postgis-3.3.0.tar.gz && \
@@ -59,15 +62,13 @@ RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.0.tar.gz && \
# Build plv8
#
FROM build-deps AS plv8-build
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt update && \
apt install -y git curl wget make ninja-build build-essential libncurses5 python3-dev pkg-config libc++-dev libc++abi-dev libglib2.0-dev
apt install -y ninja-build python3-dev libc++-dev libc++abi-dev libncurses5
# https://github.com/plv8/plv8/issues/475
# Debian bullseye provides binutils 2.35 when >= 2.38 is necessary
RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
apt update && \
RUN apt update && \
apt install -y --no-install-recommends -t testing binutils
RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
@@ -79,12 +80,46 @@ RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
rm -rf /plv8-* && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/plv8.control
#
# Layer "h3-pg-build"
# Build h3_pg
#
FROM build-deps AS h3-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# packaged cmake is too old
RUN apt update && \
apt install -y --no-install-recommends -t testing cmake
RUN wget https://github.com/uber/h3/archive/refs/tags/v4.0.1.tar.gz -O h3.tgz && \
tar xvzf h3.tgz && \
cd h3-4.0.1 && \
mkdir build && \
cd build && \
cmake .. -DCMAKE_BUILD_TYPE=Release && \
make -j $(getconf _NPROCESSORS_ONLN) && \
DESTDIR=/h3 make install && \
cp -R /h3/usr / && \
rm -rf build
RUN wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.0.1.tar.gz -O h3-pg.tgz && \
tar xvzf h3-pg.tgz && \
cd h3-pg-4.0.1 && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/h3.control
#
# Layer "neon-pg-ext-build"
# compile neon extensions
#
FROM build-deps AS neon-pg-ext-build
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
# plv8 still sometimes crashes during the creation
# COPY --from=plv8-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=h3-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=h3-pg-build /h3/usr /
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \
@@ -132,8 +167,6 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
chmod 0750 /var/db/postgres/compute && \
echo '/usr/local/lib' >> /etc/ld.so.conf && /sbin/ldconfig
# TODO: Check if we can make the extension setup more modular versus a linear build
# currently plv8-build copies the output /usr/local/pgsql from postgis-build, etc#
COPY --from=postgres-cleanup-layer --chown=postgres /usr/local/pgsql /usr/local
COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl

View File

@@ -170,7 +170,7 @@ pub fn find_end_of_wal(
let mut curr_lsn = start_lsn;
let mut buf = [0u8; XLOG_BLCKSZ];
let pg_version = PG_MAJORVERSION[1..3].parse::<u32>().unwrap();
info!("find_end_of_wal PG_VERSION: {}", pg_version);
debug!("find_end_of_wal PG_VERSION: {}", pg_version);
let mut decoder = WalStreamDecoder::new(start_lsn, pg_version);
@@ -182,7 +182,7 @@ pub fn find_end_of_wal(
match open_wal_segment(&seg_file_path)? {
None => {
// no more segments
info!(
debug!(
"find_end_of_wal reached end at {:?}, segment {:?} doesn't exist",
result, seg_file_path
);
@@ -205,7 +205,7 @@ pub fn find_end_of_wal(
match decoder.poll_decode() {
Ok(Some(record)) => result = record.0,
Err(e) => {
info!(
debug!(
"find_end_of_wal reached end at {:?}, decode error: {:?}",
result, e
);

View File

@@ -240,7 +240,6 @@ where
mod tests {
use super::*;
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
impl MonotonicCounter<i32> for i32 {
@@ -258,17 +257,19 @@ mod tests {
let seq = Arc::new(SeqWait::new(0));
let seq2 = Arc::clone(&seq);
let seq3 = Arc::clone(&seq);
tokio::task::spawn(async move {
let jh1 = tokio::task::spawn(async move {
seq2.wait_for(42).await.expect("wait_for 42");
let old = seq2.advance(100);
assert_eq!(old, 99);
seq2.wait_for(999).await.expect_err("no 999");
seq2.wait_for_timeout(999, Duration::from_millis(100))
.await
.expect_err("no 999");
});
tokio::task::spawn(async move {
let jh2 = tokio::task::spawn(async move {
seq3.wait_for(42).await.expect("wait_for 42");
seq3.wait_for(0).await.expect("wait_for 0");
});
sleep(Duration::from_secs(1));
tokio::time::sleep(Duration::from_millis(200)).await;
let old = seq.advance(99);
assert_eq!(old, 0);
seq.wait_for(100).await.expect("wait_for 100");
@@ -277,6 +278,9 @@ mod tests {
assert_eq!(seq.advance(98), 100);
assert_eq!(seq.load(), 100);
jh1.await.unwrap();
jh2.await.unwrap();
seq.shutdown();
}
@@ -284,15 +288,18 @@ mod tests {
async fn seqwait_timeout() {
let seq = Arc::new(SeqWait::new(0));
let seq2 = Arc::clone(&seq);
tokio::task::spawn(async move {
let jh = tokio::task::spawn(async move {
let timeout = Duration::from_millis(1);
let res = seq2.wait_for_timeout(42, timeout).await;
assert_eq!(res, Err(SeqWaitError::Timeout));
});
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_millis(200)).await;
// This will attempt to wake, but nothing will happen
// because the waiter already dropped its Receiver.
let old = seq.advance(99);
assert_eq!(old, 0)
assert_eq!(old, 0);
jh.await.unwrap();
seq.shutdown();
}
}

View File

@@ -1,7 +0,0 @@
2022-09-22 12:07:18.140 EEST [463605] LOG: starting PostgreSQL 15beta3 on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 9.4.0-1ubuntu1~20.04.1) 9.4.0, 64-bit
2022-09-22 12:07:18.140 EEST [463605] LOG: listening on IPv4 address "127.0.0.1", port 15331
2022-09-22 12:07:18.142 EEST [463605] LOG: listening on Unix socket "/tmp/.s.PGSQL.15331"
2022-09-22 12:07:18.145 EEST [463608] LOG: database system was shut down at 2022-09-22 12:07:17 EEST
2022-09-22 12:07:18.149 EEST [463605] LOG: database system is ready to accept connections
2022-09-22 12:07:18.211 EEST [463605] LOG: received immediate shutdown request
2022-09-22 12:07:18.218 EEST [463605] LOG: database system is shut down

View File

@@ -1,8 +1,9 @@
use metrics::core::{AtomicU64, GenericCounter};
use metrics::{
register_histogram, register_histogram_vec, register_int_counter, register_int_counter_vec,
register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec, Histogram, HistogramVec,
IntCounter, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
register_gauge_vec, register_histogram, register_histogram_vec, register_int_counter,
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec,
GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge,
UIntGaugeVec,
};
use once_cell::sync::Lazy;
use utils::id::{TenantId, TimelineId};
@@ -204,12 +205,34 @@ pub static REMAINING_SYNC_ITEMS: Lazy<IntGauge> = Lazy::new(|| {
.expect("failed to register pageserver remote storage remaining sync items int gauge")
});
pub static IMAGE_SYNC_TIME: Lazy<HistogramVec> = Lazy::new(|| {
pub static IMAGE_SYNC_TIME: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
"pageserver_remote_storage_image_sync_duration",
"Time spent to synchronize (up/download) a whole pageserver image",
&["tenant_id", "timeline_id"],
)
.expect("failed to register per-timeline pageserver image sync time vec")
});
pub static IMAGE_SYNC_OPERATION_KINDS: &[&str] = &["upload", "download", "delete"];
pub static IMAGE_SYNC_STATUS: &[&str] = &["success", "failure", "abort"];
pub static IMAGE_SYNC_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_remote_storage_image_sync_count",
"Number of synchronization operations executed for pageserver images. \
Grouped by tenant, timeline, operation_kind and status",
&["tenant_id", "timeline_id", "operation_kind", "status"]
)
.expect("failed to register pageserver image sync count vec")
});
pub static IMAGE_SYNC_TIME_HISTOGRAM: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_remote_storage_image_sync_seconds",
"Time took to synchronize (download or upload) a whole pageserver image. \
Grouped by tenant and timeline ids, `operation_kind` (upload|download) and `status` (success|failure)",
&["tenant_id", "timeline_id", "operation_kind", "status"],
Grouped by operation_kind and status",
&["operation_kind", "status"],
vec![0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 3.0, 10.0, 20.0]
)
.expect("failed to register pageserver image sync time histogram vec")
@@ -256,7 +279,7 @@ macro_rules! redo_histogram_time_buckets {
() => {
vec![
0.000_005, 0.000_010, 0.000_025, 0.000_050, 0.000_100, 0.000_250, 0.000_500, 0.001_000,
0.002_500, 0.005_000, 0.010_000, 0.025_000, 0.050_000,
0.002_500, 0.005_000, 0.010_000, 0.025_000, 0.050_000, 0.100_000, 0.250_000,
]
};
}
@@ -411,6 +434,14 @@ impl Drop for TimelineMetrics {
for op in SMGR_QUERY_TIME_OPERATIONS {
let _ = SMGR_QUERY_TIME.remove_label_values(&[op, tenant_id, timeline_id]);
}
for op in IMAGE_SYNC_OPERATION_KINDS {
for status in IMAGE_SYNC_STATUS {
let _ = IMAGE_SYNC_COUNT.remove_label_values(&[tenant_id, timeline_id, op, status]);
}
}
let _ = IMAGE_SYNC_TIME.remove_label_values(&[tenant_id, timeline_id]);
}
}

View File

@@ -667,7 +667,7 @@ impl PageServerHandler {
Ok(lsn)
}
#[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
async fn handle_get_rel_exists_request(
&self,
timeline: &Timeline,
@@ -684,7 +684,7 @@ impl PageServerHandler {
}))
}
#[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
async fn handle_get_nblocks_request(
&self,
timeline: &Timeline,
@@ -701,7 +701,7 @@ impl PageServerHandler {
}))
}
#[instrument(skip(timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
#[instrument(skip(self, timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
async fn handle_db_size_request(
&self,
timeline: &Timeline,
@@ -721,7 +721,7 @@ impl PageServerHandler {
}))
}
#[instrument(skip(timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
async fn handle_get_page_at_lsn_request(
&self,
timeline: &Timeline,
@@ -1023,6 +1023,9 @@ impl postgres_backend_async::Handler for PageServerHandler {
let params = params_raw.split(' ').collect::<Vec<_>>();
ensure!(params.len() == 1, "invalid param number for config command");
let tenant_id = TenantId::from_str(params[0])?;
self.check_permission(Some(tenant_id))?;
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
pgb.write_message(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"checkpoint_distance"),
@@ -1067,14 +1070,14 @@ impl postgres_backend_async::Handler for PageServerHandler {
let caps = re
.captures(query_string)
.with_context(|| format!("invalid get_lsn_by_timestamp: '{}'", query_string))?;
let tenant_id = TenantId::from_str(caps.get(1).unwrap().as_str())?;
let timeline_id = TimelineId::from_str(caps.get(2).unwrap().as_str())?;
let timeline = get_local_timeline(tenant_id, timeline_id)?;
let timestamp = humantime::parse_rfc3339(caps.get(3).unwrap().as_str())?;
let timestamp_pg = to_pg_timestamp(timestamp);
self.check_permission(Some(tenant_id))?;
let timeline = get_local_timeline(tenant_id, timeline_id)?;
pgb.write_message(&BeMessage::RowDescription(&[RowDescriptor::text_col(
b"lsn",
)]))?;

View File

@@ -178,6 +178,7 @@ use crate::{
TenantTimelineValues,
};
use crate::metrics::{IMAGE_SYNC_COUNT, IMAGE_SYNC_TIME_HISTOGRAM};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use self::download::download_index_parts;
@@ -835,7 +836,6 @@ async fn process_sync_task_batch(
sync_id,
upload_data,
sync_start,
"upload",
)
.await
}
@@ -879,7 +879,6 @@ async fn process_sync_task_batch(
sync_id,
download_data,
sync_start,
"download",
)
.await;
}
@@ -911,7 +910,6 @@ async fn process_sync_task_batch(
sync_id,
delete_data,
sync_start,
"delete",
)
.instrument(info_span!("delete_timeline_data"))
.await;
@@ -948,8 +946,9 @@ async fn download_timeline_data(
sync_id: TenantTimelineId,
new_download_data: SyncData<LayersDownload>,
sync_start: Instant,
task_name: &str,
) -> DownloadStatus {
static TASK_NAME: &str = "download";
match download_timeline_layers(
conf,
storage,
@@ -961,19 +960,19 @@ async fn download_timeline_data(
.await
{
DownloadedTimeline::Abort => {
register_sync_status(sync_id, sync_start, task_name, None);
register_sync_status(sync_id, sync_start, TASK_NAME, None);
if let Err(e) = index.write().await.set_awaits_download(&sync_id, false) {
error!("Timeline {sync_id} was expected to be in the remote index after a download attempt, but it's absent: {e:?}");
}
}
DownloadedTimeline::FailedAndRescheduled => {
register_sync_status(sync_id, sync_start, task_name, Some(false));
register_sync_status(sync_id, sync_start, TASK_NAME, Some(false));
}
DownloadedTimeline::Successful(mut download_data) => {
match update_local_metadata(conf, sync_id, current_remote_timeline).await {
Ok(()) => match index.write().await.set_awaits_download(&sync_id, false) {
Ok(()) => {
register_sync_status(sync_id, sync_start, task_name, Some(true));
register_sync_status(sync_id, sync_start, TASK_NAME, Some(true));
return DownloadStatus::Downloaded;
}
Err(e) => {
@@ -984,7 +983,7 @@ async fn download_timeline_data(
error!("Failed to update local timeline metadata: {e:?}");
download_data.retries += 1;
sync_queue.push(sync_id, SyncTask::Download(download_data));
register_sync_status(sync_id, sync_start, task_name, Some(false));
register_sync_status(sync_id, sync_start, TASK_NAME, Some(false));
}
}
}
@@ -1060,8 +1059,9 @@ async fn delete_timeline_data(
sync_id: TenantTimelineId,
mut new_delete_data: SyncData<LayersDeletion>,
sync_start: Instant,
task_name: &str,
) {
static TASK_NAME: &str = "delete";
let timeline_delete = &mut new_delete_data.data;
if !timeline_delete.deletion_registered {
@@ -1077,14 +1077,14 @@ async fn delete_timeline_data(
error!("Failed to update remote timeline {sync_id}: {e:?}");
new_delete_data.retries += 1;
sync_queue.push(sync_id, SyncTask::Delete(new_delete_data));
register_sync_status(sync_id, sync_start, task_name, Some(false));
register_sync_status(sync_id, sync_start, TASK_NAME, Some(false));
return;
}
}
timeline_delete.deletion_registered = true;
let sync_status = delete_timeline_layers(storage, sync_queue, sync_id, new_delete_data).await;
register_sync_status(sync_id, sync_start, task_name, Some(sync_status));
register_sync_status(sync_id, sync_start, TASK_NAME, Some(sync_status));
}
async fn read_metadata_file(metadata_path: &Path) -> anyhow::Result<TimelineMetadata> {
@@ -1103,8 +1103,8 @@ async fn upload_timeline_data(
sync_id: TenantTimelineId,
new_upload_data: SyncData<LayersUpload>,
sync_start: Instant,
task_name: &str,
) -> UploadStatus {
static TASK_NAME: &str = "upload";
let mut uploaded_data = match upload_timeline_layers(
storage,
sync_queue,
@@ -1115,7 +1115,7 @@ async fn upload_timeline_data(
.await
{
UploadedTimeline::FailedAndRescheduled(e) => {
register_sync_status(sync_id, sync_start, task_name, Some(false));
register_sync_status(sync_id, sync_start, TASK_NAME, Some(false));
return UploadStatus::Failed(e);
}
UploadedTimeline::Successful(upload_data) => upload_data,
@@ -1134,14 +1134,14 @@ async fn upload_timeline_data(
.await
{
Ok(()) => {
register_sync_status(sync_id, sync_start, task_name, Some(true));
register_sync_status(sync_id, sync_start, TASK_NAME, Some(true));
UploadStatus::Uploaded
}
Err(e) => {
error!("Failed to update remote timeline {sync_id}: {e:?}");
uploaded_data.retries += 1;
sync_queue.push(sync_id, SyncTask::Upload(uploaded_data));
register_sync_status(sync_id, sync_start, task_name, Some(false));
register_sync_status(sync_id, sync_start, TASK_NAME, Some(false));
UploadStatus::Failed(e)
}
}
@@ -1391,16 +1391,22 @@ fn register_sync_status(
let tenant_id = sync_id.tenant_id.to_string();
let timeline_id = sync_id.timeline_id.to_string();
match sync_status {
Some(true) => {
IMAGE_SYNC_TIME.with_label_values(&[&tenant_id, &timeline_id, sync_name, "success"])
}
Some(false) => {
IMAGE_SYNC_TIME.with_label_values(&[&tenant_id, &timeline_id, sync_name, "failure"])
}
None => return,
}
.observe(secs_elapsed)
let sync_status = match sync_status {
Some(true) => "success",
Some(false) => "failure",
None => "abort",
};
IMAGE_SYNC_TIME_HISTOGRAM
.with_label_values(&[sync_name, sync_status])
.observe(secs_elapsed);
IMAGE_SYNC_TIME
.with_label_values(&[&tenant_id, &timeline_id])
.add(secs_elapsed);
IMAGE_SYNC_COUNT
.with_label_values(&[&tenant_id, &timeline_id, sync_name, sync_status])
.inc();
}
#[cfg(test)]

View File

@@ -17,7 +17,6 @@ use tracing::*;
use utils::crashsafe_dir::path_with_suffix_extension;
use std::cmp::min;
use std::collections::hash_map;
use std::collections::hash_map::Entry;
use std::collections::BTreeSet;
use std::collections::HashMap;
@@ -246,12 +245,12 @@ impl Tenant {
let ancestor_ancestor_lsn = ancestor_timeline.get_ancestor_lsn();
if ancestor_ancestor_lsn > *lsn {
// can we safely just branch from the ancestor instead?
anyhow::bail!(
"invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}",
lsn,
ancestor_timeline_id,
ancestor_ancestor_lsn,
);
bail!(
"invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}",
lsn,
ancestor_timeline_id,
ancestor_ancestor_lsn,
);
}
}
@@ -406,11 +405,11 @@ impl Tenant {
.with_context(|| format!("Failed to initialize timeline {timeline_id}"))?;
match timelines_accessor.entry(timeline.timeline_id) {
hash_map::Entry::Occupied(_) => anyhow::bail!(
Entry::Occupied(_) => bail!(
"Found freshly initialized timeline {} in the tenant map",
timeline.timeline_id
),
hash_map::Entry::Vacant(v) => {
Entry::Vacant(v) => {
v.insert(timeline);
}
}
@@ -768,7 +767,7 @@ impl Tenant {
})
.with_context(|| {
format!(
"Failed to fsync on firts save for config {}",
"Failed to fsync on first save for config {}",
target_config_path.display()
)
})?;
@@ -1091,11 +1090,11 @@ impl Tenant {
})?;
match timelines.entry(new_timeline_id) {
hash_map::Entry::Occupied(_) => anyhow::bail!(
Entry::Occupied(_) => bail!(
"Found freshly initialized timeline {} in the tenant map",
new_timeline_id
),
hash_map::Entry::Vacant(v) => {
Entry::Vacant(v) => {
v.insert(Arc::clone(&new_timeline));
}
}

View File

@@ -343,7 +343,9 @@ impl Timeline {
match cached_lsn.cmp(&lsn) {
Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
Ordering::Equal => return Ok(cached_img), // exact LSN match, return the image
Ordering::Greater => panic!(), // the returned lsn should never be after the requested lsn
Ordering::Greater => {
unreachable!("the returned lsn should never be after the requested lsn")
}
}
Some((cached_lsn, cached_img))
}
@@ -726,10 +728,10 @@ impl Timeline {
Ok(())
}
pub fn layer_removal_guard(&self) -> Result<MutexGuard<()>, anyhow::Error> {
pub fn layer_removal_guard(&self) -> anyhow::Result<MutexGuard<()>> {
self.layer_removal_cs
.try_lock()
.map_err(|e| anyhow::anyhow!("cannot lock compaction critical section {e}"))
.map_err(|e| anyhow!("cannot lock compaction critical section {e}"))
}
/// Retrieve current logical size of the timeline.

View File

@@ -31,7 +31,6 @@ use etcd_broker::Client;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use std::future::Future;
use std::sync::Arc;
use tokio::sync::watch;
use tracing::*;
use url::Url;
@@ -88,37 +87,44 @@ pub fn is_etcd_client_initialized() -> bool {
/// That may lead to certain events not being observed by the listener.
#[derive(Debug)]
pub struct TaskHandle<E> {
events_receiver: watch::Receiver<TaskEvent<E>>,
join_handle: Option<tokio::task::JoinHandle<anyhow::Result<()>>>,
events_receiver: watch::Receiver<TaskStateUpdate<E>>,
cancellation: watch::Sender<()>,
}
#[derive(Debug, Clone)]
pub enum TaskEvent<E> {
Update(TaskStateUpdate<E>),
End(anyhow::Result<()>),
}
#[derive(Debug, Clone)]
pub enum TaskStateUpdate<E> {
Init,
Started,
NewEvent(E),
End,
Progress(E),
}
impl<E: Clone> TaskHandle<E> {
/// Initializes the task, starting it immediately after the creation.
pub fn spawn<Fut>(
task: impl FnOnce(Arc<watch::Sender<TaskEvent<E>>>, watch::Receiver<()>) -> Fut + Send + 'static,
task: impl FnOnce(watch::Sender<TaskStateUpdate<E>>, watch::Receiver<()>) -> Fut
+ Send
+ 'static,
) -> Self
where
Fut: Future<Output = Result<(), String>> + Send,
E: Sync + Send + 'static,
Fut: Future<Output = anyhow::Result<()>> + Send,
E: Send + Sync + 'static,
{
let (cancellation, cancellation_receiver) = watch::channel(());
let (events_sender, events_receiver) = watch::channel(TaskEvent::Started);
let events_sender = Arc::new(events_sender);
let (events_sender, events_receiver) = watch::channel(TaskStateUpdate::Started);
let sender = Arc::clone(&events_sender);
let _ = WALRECEIVER_RUNTIME.spawn(async move {
events_sender.send(TaskEvent::Started).ok();
task(sender, cancellation_receiver).await
let join_handle = WALRECEIVER_RUNTIME.spawn(async move {
events_sender.send(TaskStateUpdate::Started).ok();
task(events_sender, cancellation_receiver).await
});
TaskHandle {
join_handle: Some(join_handle),
events_receiver,
cancellation,
}
@@ -126,15 +132,45 @@ impl<E: Clone> TaskHandle<E> {
async fn next_task_event(&mut self) -> TaskEvent<E> {
match self.events_receiver.changed().await {
Ok(()) => self.events_receiver.borrow().clone(),
Err(_task_channel_part_dropped) => TaskEvent::End,
Ok(()) => TaskEvent::Update((self.events_receiver.borrow()).clone()),
Err(_task_channel_part_dropped) => {
TaskEvent::End(match self.join_handle.take() {
Some(jh) => {
if !jh.is_finished() {
warn!("sender is dropped while join handle is still alive");
}
jh.await
.map_err(|e| anyhow::anyhow!("Failed to join task: {e}"))
.and_then(|x| x)
}
None => {
// Another option is to have an enum, join handle or result and give away the reference to it
Err(anyhow::anyhow!("Task was joined more than once"))
}
})
}
}
}
/// Aborts current task, waiting for it to finish.
pub async fn shutdown(mut self) {
self.cancellation.send(()).ok();
// wait until the sender is dropped
while self.events_receiver.changed().await.is_ok() {}
pub async fn shutdown(self) {
match self.join_handle {
Some(jh) => {
self.cancellation.send(()).ok();
match jh.await {
Ok(Ok(())) => debug!("Shutdown success"),
Ok(Err(e)) => error!("Shutdown task error: {e:?}"),
Err(join_error) => {
if join_error.is_cancelled() {
error!("Shutdown task was cancelled");
} else {
error!("Shutdown task join error: {join_error}")
}
}
}
}
None => {}
}
}
}

View File

@@ -16,10 +16,10 @@ use std::{
time::Duration,
};
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::task_mgr::WALRECEIVER_RUNTIME;
use crate::tenant::Timeline;
use crate::{task_mgr, walreceiver::TaskStateUpdate};
use anyhow::Context;
use chrono::{NaiveDateTime, Utc};
use etcd_broker::{
@@ -145,19 +145,26 @@ async fn connection_manager_loop_step(
let wal_connection = walreceiver_state.wal_connection.as_mut()
.expect("Should have a connection, as checked by the corresponding select! guard");
match wal_connection_update {
TaskEvent::Started => {},
TaskEvent::NewEvent(status) => {
if status.has_processed_wal {
// We have advanced last_record_lsn by processing the WAL received
// from this safekeeper. This is good enough to clean unsuccessful
// retries history and allow reconnecting to this safekeeper without
// sleeping for a long time.
walreceiver_state.wal_connection_retries.remove(&wal_connection.sk_id);
TaskEvent::Update(c) => {
match c {
TaskStateUpdate::Init | TaskStateUpdate::Started => {},
TaskStateUpdate::Progress(status) => {
if status.has_processed_wal {
// We have advanced last_record_lsn by processing the WAL received
// from this safekeeper. This is good enough to clean unsuccessful
// retries history and allow reconnecting to this safekeeper without
// sleeping for a long time.
walreceiver_state.wal_connection_retries.remove(&wal_connection.sk_id);
}
wal_connection.status = status.to_owned();
}
}
wal_connection.status = status;
},
TaskEvent::End => {
debug!("WAL receiving task finished");
TaskEvent::End(walreceiver_task_result) => {
match walreceiver_task_result {
Ok(()) => debug!("WAL receiving task finished"),
Err(e) => error!("wal receiver task finished with an error: {e:?}"),
}
walreceiver_state.drop_old_connection(false).await;
},
}
@@ -363,13 +370,13 @@ impl WalreceiverState {
async move {
super::walreceiver_connection::handle_walreceiver_connection(
timeline,
&new_wal_source_connstr,
events_sender.as_ref(),
new_wal_source_connstr,
events_sender,
cancellation,
connect_timeout,
)
.await
.map_err(|e| format!("walreceiver connection handling failure: {e:#}"))
.context("walreceiver connection handling failure")
}
.instrument(info_span!("walreceiver_connection", id = %id))
});
@@ -885,7 +892,7 @@ mod tests {
status: connection_status.clone(),
connection_task: TaskHandle::spawn(move |sender, _| async move {
sender
.send(TaskEvent::NewEvent(connection_status.clone()))
.send(TaskStateUpdate::Progress(connection_status.clone()))
.ok();
Ok(())
}),
@@ -1145,7 +1152,7 @@ mod tests {
status: connection_status.clone(),
connection_task: TaskHandle::spawn(move |sender, _| async move {
sender
.send(TaskEvent::NewEvent(connection_status.clone()))
.send(TaskStateUpdate::Progress(connection_status.clone()))
.ok();
Ok(())
}),
@@ -1233,7 +1240,7 @@ mod tests {
status: connection_status.clone(),
connection_task: TaskHandle::spawn(move |sender, _| async move {
sender
.send(TaskEvent::NewEvent(connection_status.clone()))
.send(TaskStateUpdate::Progress(connection_status.clone()))
.ok();
Ok(())
}),

View File

@@ -16,10 +16,9 @@ use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use tokio::{pin, select, sync::watch, time};
use tokio_postgres::{replication::ReplicationStream, Client};
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use tracing::{debug, error, info, trace, warn};
use super::TaskEvent;
use crate::metrics::LIVE_CONNECTIONS_COUNT;
use crate::{metrics::LIVE_CONNECTIONS_COUNT, walreceiver::TaskStateUpdate};
use crate::{
task_mgr,
task_mgr::TaskKind,
@@ -55,8 +54,8 @@ pub struct WalConnectionStatus {
/// messages as we go.
pub async fn handle_walreceiver_connection(
timeline: Arc<Timeline>,
wal_source_connstr: &str,
events_sender: &watch::Sender<TaskEvent<WalConnectionStatus>>,
wal_source_connstr: String,
events_sender: watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
mut cancellation: watch::Receiver<()>,
connect_timeout: Duration,
) -> anyhow::Result<()> {
@@ -81,7 +80,7 @@ pub async fn handle_walreceiver_connection(
streaming_lsn: None,
commit_lsn: None,
};
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone())) {
warn!("Wal connection event listener dropped right after connection init, aborting the connection: {e}");
return Ok(());
}
@@ -112,8 +111,7 @@ pub async fn handle_walreceiver_connection(
_ = connection_cancellation.changed() => info!("Connection cancelled"),
}
Ok(())
}
.instrument(info_span!("walreceiver connection")),
},
);
// Immediately increment the gauge, then create a job to decrement it on task exit.
@@ -134,7 +132,7 @@ pub async fn handle_walreceiver_connection(
connection_status.latest_connection_update = Utc::now().naive_utc();
connection_status.latest_wal_update = Utc::now().naive_utc();
connection_status.commit_lsn = Some(end_of_wal);
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone())) {
warn!("Wal connection event listener dropped after IDENTIFY_SYSTEM, aborting the connection: {e}");
return Ok(());
}
@@ -202,7 +200,7 @@ pub async fn handle_walreceiver_connection(
}
&_ => {}
};
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone())) {
warn!("Wal connection event listener dropped, aborting the connection: {e}");
return Ok(());
}
@@ -268,7 +266,8 @@ pub async fn handle_walreceiver_connection(
if !connection_status.has_processed_wal && last_rec_lsn > last_rec_lsn_before_msg {
// We have successfully processed at least one WAL record.
connection_status.has_processed_wal = true;
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone()))
{
warn!("Wal connection event listener dropped, aborting the connection: {e}");
return Ok(());
}

View File

@@ -259,3 +259,15 @@ fn parse_host_port(input: &str) -> Option<(&str, u16)> {
let (host, port) = input.split_once(':')?;
Some((host, port.parse().ok()?))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_host_port() {
let (host, port) = parse_host_port("127.0.0.1:5432").expect("failed to parse");
assert_eq!(host, "127.0.0.1");
assert_eq!(port, 5432);
}
}

View File

@@ -54,13 +54,10 @@ impl<'a> ClientCredentials<'a> {
let dbname = get_param("database")?;
// Project name might be passed via PG's command-line options.
let project_a = params.options_raw().and_then(|options| {
for opt in options {
if let Some(value) = opt.strip_prefix("project=") {
return Some(Cow::Borrowed(value));
}
}
None
let project_a = params.options_raw().and_then(|mut options| {
options
.find_map(|opt| opt.strip_prefix("project="))
.map(Cow::Borrowed)
});
// Alternative project name is in fact a subdomain from SNI.

View File

@@ -52,6 +52,16 @@ impl CancelMap {
let session = Session::new(key, self);
f(session).await
}
#[cfg(test)]
fn contains(&self, session: &Session) -> bool {
self.0.lock().contains_key(&session.key)
}
#[cfg(test)]
fn is_empty(&self) -> bool {
self.0.lock().is_empty()
}
}
/// This should've been a [`std::future::Future`], but
@@ -104,3 +114,39 @@ impl<'a> Session<'a> {
self.key
}
}
#[cfg(test)]
mod tests {
use super::*;
use once_cell::sync::Lazy;
#[tokio::test]
async fn check_session_drop() -> anyhow::Result<()> {
static CANCEL_MAP: Lazy<CancelMap> = Lazy::new(Default::default);
let (tx, rx) = tokio::sync::oneshot::channel();
let task = tokio::spawn(CANCEL_MAP.with_session(|session| async move {
assert!(CANCEL_MAP.contains(&session));
tx.send(()).expect("failed to send");
let () = futures::future::pending().await; // sleep forever
Ok(())
}));
// Wait until the task has been spawned.
let () = rx.await.context("failed to hear from the task")?;
// Drop the session's entry by cancelling the task.
task.abort();
let error = task.await.expect_err("task should have failed");
if !error.is_cancelled() {
anyhow::bail!(error);
}
// Check that the session has been dropped.
assert!(CANCEL_MAP.is_empty());
Ok(())
}
}

View File

@@ -1,6 +1,5 @@
//! Small parsing helpers.
use std::convert::TryInto;
use std::ffi::CStr;
pub fn split_cstr(bytes: &[u8]) -> Option<(&CStr, &[u8])> {
@@ -10,9 +9,36 @@ pub fn split_cstr(bytes: &[u8]) -> Option<(&CStr, &[u8])> {
Some((unsafe { CStr::from_bytes_with_nul_unchecked(cstr) }, other))
}
/// See <https://doc.rust-lang.org/std/primitive.slice.html#method.split_array_ref>.
pub fn split_at_const<const N: usize>(bytes: &[u8]) -> Option<(&[u8; N], &[u8])> {
(bytes.len() >= N).then(|| {
let (head, tail) = bytes.split_at(N);
(head.try_into().unwrap(), tail)
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_split_cstr() {
assert!(split_cstr(b"").is_none());
assert!(split_cstr(b"foo").is_none());
let (cstr, rest) = split_cstr(b"\0").expect("uh-oh");
assert_eq!(cstr.to_bytes(), b"");
assert_eq!(rest, b"");
let (cstr, rest) = split_cstr(b"foo\0bar").expect("uh-oh");
assert_eq!(cstr.to_bytes(), b"foo");
assert_eq!(rest, b"bar");
}
#[test]
fn test_split_at_const() {
assert!(split_at_const::<0>(b"").is_some());
assert!(split_at_const::<1>(b"").is_none());
assert!(matches!(split_at_const::<1>(b"ok"), Some((b"o", b"k"))));
}
}

View File

@@ -248,6 +248,18 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
oldstate.timeline_start_lsn = Lsn(1);
oldstate.local_start_lsn = Lsn(1);
return Ok(oldstate);
} else if version == 6 {
info!("reading safekeeper control file version {}", version);
let mut oldstate = SafeKeeperState::des(&buf[..buf.len()])?;
if oldstate.server.pg_version != 0 {
return Ok(oldstate);
}
// set pg_version to the default v14
info!("setting pg_version to 140005");
oldstate.server.pg_version = 140005;
return Ok(oldstate);
}
bail!("unsupported safekeeper control file version {}", version)

View File

@@ -2,7 +2,7 @@
use std::time::{Instant, SystemTime};
use ::metrics::{register_histogram, GaugeVec, Histogram, DISK_WRITE_SECONDS_BUCKETS};
use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS};
use anyhow::Result;
use metrics::{
core::{AtomicU64, Collector, Desc, GenericGaugeVec, Opts},
@@ -135,6 +135,7 @@ pub struct TimelineCollector {
written_wal_seconds: GaugeVec,
flushed_wal_seconds: GaugeVec,
collect_timeline_metrics: Gauge,
timelines_count: IntGauge,
}
impl Default for TimelineCollector {
@@ -311,6 +312,13 @@ impl TimelineCollector {
.unwrap();
descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
let timelines_count = IntGauge::new(
"safekeeper_timelines",
"Total number of timelines loaded in-memory",
)
.unwrap();
descs.extend(timelines_count.desc().into_iter().cloned());
TimelineCollector {
descs,
commit_lsn,
@@ -330,6 +338,7 @@ impl TimelineCollector {
written_wal_seconds,
flushed_wal_seconds,
collect_timeline_metrics,
timelines_count,
}
}
}
@@ -361,6 +370,7 @@ impl Collector for TimelineCollector {
self.flushed_wal_seconds.reset();
let timelines = GlobalTimelines::get_all();
let timelines_count = timelines.len();
for arc_tli in timelines {
let tli = arc_tli.info_for_metrics();
@@ -474,6 +484,10 @@ impl Collector for TimelineCollector {
self.collect_timeline_metrics.set(elapsed);
mfs.extend(self.collect_timeline_metrics.collect());
// report total number of timelines
self.timelines_count.set(timelines_count as i64);
mfs.extend(self.timelines_count.collect());
mfs
}
}

View File

@@ -25,7 +25,7 @@ use utils::{
};
pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 6;
pub const SK_FORMAT_VERSION: u32 = 7;
const SK_PROTOCOL_VERSION: u32 = 2;
pub const UNKNOWN_SERVER_VERSION: u32 = 0;
@@ -639,7 +639,6 @@ where
let mut state = self.state.clone();
state.server.system_id = msg.system_id;
state.server.wal_seg_size = msg.wal_seg_size;
if msg.pg_version != UNKNOWN_SERVER_VERSION {
state.server.pg_version = msg.pg_version;
}
@@ -830,10 +829,6 @@ where
self.epoch_start_lsn = msg.h.epoch_start_lsn;
self.inmem.proposer_uuid = msg.h.proposer_uuid;
// bootstrap the decoder, if not yet
self.wal_store
.init_decoder(self.state.server.pg_version / 10000, self.state.commit_lsn)?;
// do the job
if !msg.wal_data.is_empty() {
self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?;
@@ -990,10 +985,6 @@ mod tests {
}
impl wal_storage::Storage for DummyWalStore {
fn init_decoder(&mut self, _pg_majorversion: u32, _commit_lsn: Lsn) -> Result<()> {
Ok(())
}
fn flush_lsn(&self) -> Lsn {
self.lsn
}

View File

@@ -314,6 +314,8 @@ impl Timeline {
ttid: TenantTimelineId,
wal_backup_launcher_tx: Sender<TenantTimelineId>,
) -> Result<Timeline> {
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
let shared_state = SharedState::restore(&conf, &ttid)?;
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
watch::channel(shared_state.sk.state.commit_lsn);

View File

@@ -38,8 +38,6 @@ use postgres_ffi::waldecoder::WalStreamDecoder;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
pub trait Storage {
// Bootstrap the wal decoder with correct pg_version
fn init_decoder(&mut self, pg_majorversion: u32, commit_lsn: Lsn) -> Result<()>;
/// LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn;
@@ -89,8 +87,7 @@ pub struct PhysicalStorage {
flush_record_lsn: Lsn,
/// Decoder is required for detecting boundaries of WAL records.
/// None until it is initialized
decoder: Option<WalStreamDecoder>,
decoder: WalStreamDecoder,
/// Cached open file for the last segment.
///
@@ -114,19 +111,33 @@ impl PhysicalStorage {
// Find out where stored WAL ends, starting at commit_lsn which is a
// known recent record boundary (unless we don't have WAL at all).
//
// NB: find_end_of_wal MUST be backwards compatible with the previously
// written WAL. If find_end_of_wal fails to read any WAL written by an
// older version of the code, we could lose data forever.
let write_lsn = if state.commit_lsn == Lsn(0) {
Lsn(0)
} else {
// FIXME What would be the correct value here, if we can not
// call find_end_of_wal yet, because we don't know pg_version?
state.commit_lsn
match state.server.pg_version / 10000 {
14 => postgres_ffi::v14::xlog_utils::find_end_of_wal(
&timeline_dir,
wal_seg_size,
state.commit_lsn,
)?,
15 => postgres_ffi::v15::xlog_utils::find_end_of_wal(
&timeline_dir,
wal_seg_size,
state.commit_lsn,
)?,
_ => bail!("unsupported postgres version: {}", state.server.pg_version),
}
};
// TODO: do we really know that write_lsn is fully flushed to disk?
// If not, maybe it's better to call fsync() here to be sure?
let flush_lsn = write_lsn;
info!(
debug!(
"initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
ttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
);
@@ -142,7 +153,7 @@ impl PhysicalStorage {
write_lsn,
write_record_lsn: write_lsn,
flush_record_lsn: flush_lsn,
decoder: None,
decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000),
file: None,
})
}
@@ -257,42 +268,6 @@ impl PhysicalStorage {
}
impl Storage for PhysicalStorage {
fn init_decoder(&mut self, pg_majorversion: u32, commit_lsn: Lsn) -> Result<()> {
if self.decoder.is_some() {
return Ok(());
}
info!(
"init_decoder for pg_version {} and commit_lsn {}",
pg_majorversion, commit_lsn
);
let write_lsn = match pg_majorversion {
14 => postgres_ffi::v14::xlog_utils::find_end_of_wal(
&self.timeline_dir,
self.wal_seg_size,
commit_lsn,
)?,
15 => postgres_ffi::v15::xlog_utils::find_end_of_wal(
&self.timeline_dir,
self.wal_seg_size,
commit_lsn,
)?,
_ => bail!("unsupported postgres version"),
};
info!(
"init_decoder for pg_version {} and commit_lsn {}. write_lsn = {}",
pg_majorversion, commit_lsn, write_lsn
);
self.decoder = Some(WalStreamDecoder::new(write_lsn, pg_majorversion));
self.flush_record_lsn = write_lsn;
self.write_record_lsn = write_lsn;
Ok(())
}
/// flush_lsn returns LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn {
self.flush_record_lsn
@@ -324,18 +299,18 @@ impl Storage for PhysicalStorage {
// figure out last record's end lsn for reporting (if we got the
// whole record)
if self.decoder.as_ref().unwrap().available() != startpos {
if self.decoder.available() != startpos {
info!(
"restart decoder from {} to {}",
self.decoder.as_ref().unwrap().available(),
self.decoder.available(),
startpos,
);
let pg_version = self.decoder.as_ref().unwrap().pg_version;
self.decoder = Some(WalStreamDecoder::new(startpos, pg_version));
let pg_version = self.decoder.pg_version;
self.decoder = WalStreamDecoder::new(startpos, pg_version);
}
self.decoder.as_mut().unwrap().feed_bytes(buf);
self.decoder.feed_bytes(buf);
loop {
match self.decoder.as_mut().unwrap().poll_decode()? {
match self.decoder.poll_decode()? {
None => break, // no full record yet
Some((lsn, _rec)) => {
self.write_record_lsn = lsn;

View File

@@ -710,8 +710,8 @@ if __name__ == "__main__":
"--psql-path",
dest="psql_path",
required=False,
default="/usr/local/bin/psql",
help="Path to the psql binary. Default: /usr/local/bin/psql",
default="/usr/local/v14/bin/psql",
help="Path to the psql binary. Default: /usr/local/v14/bin/psql",
)
parser.add_argument(
"--only-import",

View File

@@ -1,6 +1,5 @@
import logging
import logging.config
import re
"""
This file configures logging to use in python tests.
@@ -30,17 +29,6 @@ LOGGING = {
}
class PasswordFilter(logging.Filter):
"""Filter out password from logs."""
# Good enough to filter our passwords produced by PgProtocol.connstr
FILTER = re.compile(r"(\s*)password=[^\s]+(\s*)")
def filter(self, record: logging.LogRecord) -> bool:
record.msg = self.FILTER.sub(r"\1password=<hidden>\2", str(record.msg))
return True
def getLogger(name="root") -> logging.Logger:
"""Method to get logger for tests.
@@ -50,6 +38,5 @@ def getLogger(name="root") -> logging.Logger:
# default logger for tests
log = getLogger()
log.addFilter(PasswordFilter())
logging.config.dictConfig(LOGGING)

View File

@@ -283,10 +283,15 @@ class PgProtocol:
return str(make_dsn(**self.conn_options(**kwargs)))
def conn_options(self, **kwargs):
"""
Construct a dictionary of connection options from default values and extra parameters.
An option can be dropped from the returning dictionary by None-valued extra parameter.
"""
result = self.default_options.copy()
if "dsn" in kwargs:
result.update(parse_dsn(kwargs["dsn"]))
result.update(kwargs)
result = {k: v for k, v in result.items() if v is not None}
# Individual statement timeout in seconds. 2 minutes should be
# enough for our tests, but if you need a longer, you can

View File

@@ -4,7 +4,7 @@ import os
import timeit
from datetime import datetime
from pathlib import Path
from typing import List
from typing import Dict, List
import pytest
from fixtures.benchmark_fixture import MetricReport, PgBenchInitResult, PgBenchRunResult
@@ -24,14 +24,18 @@ def utc_now_timestamp() -> int:
return calendar.timegm(datetime.utcnow().utctimetuple())
def init_pgbench(env: PgCompare, cmdline):
def init_pgbench(env: PgCompare, cmdline, password: None):
environ: Dict[str, str] = {}
if password is not None:
environ["PGPASSWORD"] = password
# calculate timestamps and durations separately
# timestamp is intended to be used for linking to grafana and logs
# duration is actually a metric and uses float instead of int for timestamp
start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
with env.record_pageserver_writes("init.pageserver_writes"):
out = env.pg_bin.run_capture(cmdline)
out = env.pg_bin.run_capture(cmdline, env=environ)
env.flush()
duration = timeit.default_timer() - t0
@@ -48,13 +52,15 @@ def init_pgbench(env: PgCompare, cmdline):
env.zenbenchmark.record_pg_bench_init_result("init", res)
def run_pgbench(env: PgCompare, prefix: str, cmdline):
def run_pgbench(env: PgCompare, prefix: str, cmdline, password: None):
environ: Dict[str, str] = {}
if password is not None:
environ["PGPASSWORD"] = password
with env.record_pageserver_writes(f"{prefix}.pageserver_writes"):
run_start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
out = env.pg_bin.run_capture(
cmdline,
)
out = env.pg_bin.run_capture(cmdline, env=environ)
run_duration = timeit.default_timer() - t0
run_end_timestamp = utc_now_timestamp()
env.flush()
@@ -82,10 +88,14 @@ def run_pgbench(env: PgCompare, prefix: str, cmdline):
def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: PgBenchLoadType):
env.zenbenchmark.record("scale", scale, "", MetricReport.TEST_PARAM)
password = env.pg.default_options.get("password", None)
options = "-cstatement_timeout=1h " + env.pg.default_options.get("options", "")
# drop password from the connection string by passing password=None and set password separately
connstr = env.pg.connstr(password=None, options=options)
if workload_type == PgBenchLoadType.INIT:
# Run initialize
options = "-cstatement_timeout=1h " + env.pg.default_options.get("options", "")
init_pgbench(env, ["pgbench", f"-s{scale}", "-i", env.pg.connstr(options=options)])
init_pgbench(env, ["pgbench", f"-s{scale}", "-i", connstr], password=password)
if workload_type == PgBenchLoadType.SIMPLE_UPDATE:
# Run simple-update workload
@@ -99,8 +109,9 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
f"-T{duration}",
"-P2",
"--progress-timestamp",
env.pg.connstr(),
connstr,
],
password=password,
)
if workload_type == PgBenchLoadType.SELECT_ONLY:
@@ -115,8 +126,9 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
f"-T{duration}",
"-P2",
"--progress-timestamp",
env.pg.connstr(),
connstr,
],
password=password,
)
env.report_size()