mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 03:30:36 +00:00
Compare commits
48 Commits
support_pg
...
bayandin/g
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d884af4fb9 | ||
|
|
d98c53174e | ||
|
|
dabb6d2675 | ||
|
|
fc7087b16f | ||
|
|
2233ca2a39 | ||
|
|
fb68d01449 | ||
|
|
d15116f2cc | ||
|
|
df45c0d0e5 | ||
|
|
367cc01290 | ||
|
|
1165686201 | ||
|
|
093264a695 | ||
|
|
805bb198c2 | ||
|
|
5ccd54c699 | ||
|
|
1dffba9de6 | ||
|
|
ebab89ebd2 | ||
|
|
bc3ba23e0a | ||
|
|
3e65209a06 | ||
|
|
eb0c6bcf1a | ||
|
|
52819898e4 | ||
|
|
b0377f750a | ||
|
|
43560506c0 | ||
|
|
c81ede8644 | ||
|
|
eb9200abc8 | ||
|
|
7c1695e87d | ||
|
|
8b42c184e7 | ||
|
|
7138db9279 | ||
|
|
262fa3be09 | ||
|
|
5e151192f5 | ||
|
|
2d012f0d32 | ||
|
|
64f64d5637 | ||
|
|
1fa7d6aebf | ||
|
|
d098542dde | ||
|
|
eba419fda3 | ||
|
|
d8d3cd49f4 | ||
|
|
3618c242b9 | ||
|
|
ed6b75e301 | ||
|
|
862902f9e5 | ||
|
|
8d890b3cbb | ||
|
|
0fde59aa46 | ||
|
|
1255ef806f | ||
|
|
5dddeb8d88 | ||
|
|
d45de3d58f | ||
|
|
a69e060f0f | ||
|
|
a4397d43e9 | ||
|
|
03c606f7c5 | ||
|
|
9dfede8146 | ||
|
|
86bf491981 | ||
|
|
e764c1e60f |
9
.github/actions/download/action.yml
vendored
9
.github/actions/download/action.yml
vendored
@@ -12,6 +12,9 @@ inputs:
|
||||
description: "Allow to skip if file doesn't exist, fail otherwise"
|
||||
default: false
|
||||
required: false
|
||||
prefix:
|
||||
description: "S3 prefix. Default is '${GITHUB_RUN_ID}/${GITHUB_RUN_ATTEMPT}'"
|
||||
required: false
|
||||
|
||||
runs:
|
||||
using: "composite"
|
||||
@@ -23,18 +26,18 @@ runs:
|
||||
TARGET: ${{ inputs.path }}
|
||||
ARCHIVE: /tmp/downloads/${{ inputs.name }}.tar.zst
|
||||
SKIP_IF_DOES_NOT_EXIST: ${{ inputs.skip-if-does-not-exist }}
|
||||
PREFIX: artifacts/${{ inputs.prefix || format('{0}/{1}', github.run_id, github.run_attempt) }}
|
||||
run: |
|
||||
BUCKET=neon-github-public-dev
|
||||
PREFIX=artifacts/${GITHUB_RUN_ID}
|
||||
FILENAME=$(basename $ARCHIVE)
|
||||
|
||||
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${PREFIX} | jq -r '.Contents[].Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
|
||||
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${PREFIX%$GITHUB_RUN_ATTEMPT} | jq -r '.Contents[].Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
|
||||
if [ -z "${S3_KEY}" ]; then
|
||||
if [ "${SKIP_IF_DOES_NOT_EXIST}" = "true" ]; then
|
||||
echo '::set-output name=SKIPPED::true'
|
||||
exit 0
|
||||
else
|
||||
echo 2>&1 "Neither s3://${BUCKET}/${PREFIX}/${GITHUB_RUN_ATTEMPT}/${FILENAME} nor its version from previous attempts exist"
|
||||
echo 2>&1 "Neither s3://${BUCKET}/${PREFIX}/${FILENAME} nor its version from previous attempts exist"
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
|
||||
34
.github/actions/neon-project-create/action.yml
vendored
34
.github/actions/neon-project-create/action.yml
vendored
@@ -63,7 +63,39 @@ runs:
|
||||
\"name\": \"Created by actions/neon-project-create; GITHUB_RUN_ID=${GITHUB_RUN_ID}\",
|
||||
\"platform_id\": \"aws\",
|
||||
\"region_id\": \"${REGION_ID}\",
|
||||
\"settings\": { }
|
||||
\"settings\": {
|
||||
\"constraint_exclusion\": \"off\",
|
||||
\"cpu_index_tuple_cost\": \"1.637272286893718e+308\",
|
||||
\"cpu_operator_cost\": \"1.79769e+308\",
|
||||
\"cpu_tuple_cost\": \"1.6957485653186957e+308\",
|
||||
\"cursor_tuple_fraction\": \"0.4685070675008615\",
|
||||
\"enable_async_append\": \"off\",
|
||||
\"enable_bitmapscan\": \"on\",
|
||||
\"enable_gathermerge\": \"on\",
|
||||
\"enable_hashagg\": \"on\",
|
||||
\"enable_hashjoin\": \"off\",
|
||||
\"enable_incremental_sort\": \"on\",
|
||||
\"enable_indexonlyscan\": \"on\",
|
||||
\"enable_indexscan\": \"on\",
|
||||
\"enable_material\": \"off\",
|
||||
\"enable_mergejoin\": \"off\",
|
||||
\"enable_nestloop\": \"off\",
|
||||
\"enable_parallel_append\": \"off\",
|
||||
\"enable_parallel_hash\": \"off\",
|
||||
\"enable_partition_pruning\": \"on\",
|
||||
\"enable_partitionwise_aggregate\": \"off\",
|
||||
\"enable_partitionwise_join\": \"on\",
|
||||
\"enable_seqscan\": \"on\",
|
||||
\"enable_sort\": \"off\",
|
||||
\"enable_tidscan\": \"off\",
|
||||
\"from_collapse_limit\": \"1594910280\",
|
||||
\"min_parallel_table_scan_size\": \"0\",
|
||||
\"parallel_setup_cost\": \"1.79769e+308\",
|
||||
\"parallel_tuple_cost\": \"6.288448285557434e+307\",
|
||||
\"plan_cache_mode\": \"force_custom_plan\",
|
||||
\"random_page_cost\": \"2.468072863935744e+307\",
|
||||
\"seq_page_cost\": \"0.0\"
|
||||
}
|
||||
}
|
||||
}")
|
||||
|
||||
|
||||
@@ -127,7 +127,7 @@ runs:
|
||||
|
||||
# Wake up the cluster if we use remote neon instance
|
||||
if [ "${{ inputs.build_type }}" = "remote" ] && [ -n "${BENCHMARK_CONNSTR}" ]; then
|
||||
${POSTGRES_DISTRIB_DIR}/v14/bin/psql ${BENCHMARK_CONNSTR} -c "SELECT version();"
|
||||
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/psql ${BENCHMARK_CONNSTR} -c "SELECT version();"
|
||||
fi
|
||||
|
||||
# Run the tests.
|
||||
|
||||
9
.github/actions/upload/action.yml
vendored
9
.github/actions/upload/action.yml
vendored
@@ -7,6 +7,9 @@ inputs:
|
||||
path:
|
||||
description: "A directory or file to upload"
|
||||
required: true
|
||||
prefix:
|
||||
description: "S3 prefix. Default is '${GITHUB_RUN_ID}/${GITHUB_RUN_ATTEMPT}'"
|
||||
required: false
|
||||
|
||||
runs:
|
||||
using: "composite"
|
||||
@@ -42,14 +45,14 @@ runs:
|
||||
env:
|
||||
SOURCE: ${{ inputs.path }}
|
||||
ARCHIVE: /tmp/uploads/${{ inputs.name }}.tar.zst
|
||||
PREFIX: artifacts/${{ inputs.prefix || format('{0}/{1}', github.run_id, github.run_attempt) }}
|
||||
run: |
|
||||
BUCKET=neon-github-public-dev
|
||||
PREFIX=artifacts/${GITHUB_RUN_ID}
|
||||
FILENAME=$(basename $ARCHIVE)
|
||||
|
||||
FILESIZE=$(du -sh ${ARCHIVE} | cut -f1)
|
||||
|
||||
time aws s3 mv --only-show-errors ${ARCHIVE} s3://${BUCKET}/${PREFIX}/${GITHUB_RUN_ATTEMPT}/${FILENAME}
|
||||
time aws s3 mv --only-show-errors ${ARCHIVE} s3://${BUCKET}/${PREFIX}/${FILENAME}
|
||||
|
||||
# Ref https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#adding-a-job-summary
|
||||
echo "[${FILENAME}](https://${BUCKET}.s3.amazonaws.com/${PREFIX}/${GITHUB_RUN_ATTEMPT}/${FILENAME}) ${FILESIZE}" >> ${GITHUB_STEP_SUMMARY}
|
||||
echo "[${FILENAME}](https://${BUCKET}.s3.amazonaws.com/${PREFIX}/${FILENAME}) ${FILESIZE}" >> ${GITHUB_STEP_SUMMARY}
|
||||
|
||||
46
.github/ansible/deploy.yaml
vendored
46
.github/ansible/deploy.yaml
vendored
@@ -58,23 +58,23 @@
|
||||
creates: "/storage/pageserver/data/tenants"
|
||||
environment:
|
||||
NEON_REPO_DIR: "/storage/pageserver/data"
|
||||
LD_LIBRARY_PATH: "/usr/local/lib"
|
||||
LD_LIBRARY_PATH: "/usr/local/v14/lib"
|
||||
become: true
|
||||
tags:
|
||||
- pageserver
|
||||
|
||||
# - name: update remote storage (s3) config
|
||||
# lineinfile:
|
||||
# path: /storage/pageserver/data/pageserver.toml
|
||||
# line: "{{ item }}"
|
||||
# loop:
|
||||
# - "[remote_storage]"
|
||||
# - "bucket_name = '{{ bucket_name }}'"
|
||||
# - "bucket_region = '{{ bucket_region }}'"
|
||||
# - "prefix_in_bucket = '{{ inventory_hostname }}'"
|
||||
# become: true
|
||||
# tags:
|
||||
# - pageserver
|
||||
- name: update remote storage (s3) config
|
||||
lineinfile:
|
||||
path: /storage/pageserver/data/pageserver.toml
|
||||
line: "{{ item }}"
|
||||
loop:
|
||||
- "[remote_storage]"
|
||||
- "bucket_name = '{{ bucket_name }}'"
|
||||
- "bucket_region = '{{ bucket_region }}'"
|
||||
- "prefix_in_bucket = '{{ inventory_hostname }}'"
|
||||
become: true
|
||||
tags:
|
||||
- pageserver
|
||||
|
||||
- name: upload systemd service definition
|
||||
ansible.builtin.template:
|
||||
@@ -87,15 +87,15 @@
|
||||
tags:
|
||||
- pageserver
|
||||
|
||||
# - name: start systemd service
|
||||
# ansible.builtin.systemd:
|
||||
# daemon_reload: yes
|
||||
# name: pageserver
|
||||
# enabled: yes
|
||||
# state: restarted
|
||||
# become: true
|
||||
# tags:
|
||||
# - pageserver
|
||||
- name: start systemd service
|
||||
ansible.builtin.systemd:
|
||||
daemon_reload: yes
|
||||
name: pageserver
|
||||
enabled: yes
|
||||
state: restarted
|
||||
become: true
|
||||
tags:
|
||||
- pageserver
|
||||
|
||||
- name: post version to console
|
||||
when: console_mgmt_base_url is defined
|
||||
@@ -132,7 +132,7 @@
|
||||
creates: "/storage/safekeeper/data/safekeeper.id"
|
||||
environment:
|
||||
NEON_REPO_DIR: "/storage/safekeeper/data"
|
||||
LD_LIBRARY_PATH: "/usr/local/lib"
|
||||
LD_LIBRARY_PATH: "/usr/local/v14/lib"
|
||||
become: true
|
||||
tags:
|
||||
- safekeeper
|
||||
|
||||
6
.github/ansible/get_binaries.sh
vendored
6
.github/ansible/get_binaries.sh
vendored
@@ -21,10 +21,14 @@ docker pull --quiet neondatabase/neon:${DOCKER_TAG}
|
||||
ID=$(docker create neondatabase/neon:${DOCKER_TAG})
|
||||
docker cp ${ID}:/data/postgres_install.tar.gz .
|
||||
tar -xzf postgres_install.tar.gz -C neon_install
|
||||
mkdir neon_install/bin/
|
||||
docker cp ${ID}:/usr/local/bin/pageserver neon_install/bin/
|
||||
docker cp ${ID}:/usr/local/bin/safekeeper neon_install/bin/
|
||||
docker cp ${ID}:/usr/local/bin/proxy neon_install/bin/
|
||||
docker cp ${ID}:/usr/local/bin/postgres neon_install/bin/
|
||||
docker cp ${ID}:/usr/local/v14/bin/ neon_install/v14/bin/
|
||||
docker cp ${ID}:/usr/local/v15/bin/ neon_install/v15/bin/
|
||||
docker cp ${ID}:/usr/local/v14/lib/ neon_install/v14/lib/
|
||||
docker cp ${ID}:/usr/local/v15/lib/ neon_install/v15/lib/
|
||||
docker rm -vf ${ID}
|
||||
|
||||
# store version to file (for ansible playbooks) and create binaries tarball
|
||||
|
||||
4
.github/ansible/staging.hosts
vendored
4
.github/ansible/staging.hosts
vendored
@@ -3,11 +3,15 @@
|
||||
zenith-us-stage-ps-2 console_region_id=27
|
||||
zenith-us-stage-ps-3 console_region_id=27
|
||||
zenith-us-stage-ps-4 console_region_id=27
|
||||
zenith-us-stage-test-ps-1 console_region_id=28
|
||||
|
||||
[safekeepers]
|
||||
zenith-us-stage-sk-4 console_region_id=27
|
||||
zenith-us-stage-sk-5 console_region_id=27
|
||||
zenith-us-stage-sk-6 console_region_id=27
|
||||
zenith-us-stage-test-sk-1 console_region_id=28
|
||||
zenith-us-stage-test-sk-2 console_region_id=28
|
||||
zenith-us-stage-test-sk-3 console_region_id=28
|
||||
|
||||
[storage:children]
|
||||
pageservers
|
||||
|
||||
2
.github/ansible/systemd/pageserver.service
vendored
2
.github/ansible/systemd/pageserver.service
vendored
@@ -5,7 +5,7 @@ After=network.target auditd.service
|
||||
[Service]
|
||||
Type=simple
|
||||
User=pageserver
|
||||
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/pageserver LD_LIBRARY_PATH=/usr/local/lib
|
||||
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/pageserver LD_LIBRARY_PATH=/usr/local/v14/lib
|
||||
ExecStart=/usr/local/bin/pageserver -c "pg_distrib_dir='/usr/local'" -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -c "broker_endpoints=['{{ etcd_endpoints }}']" -D /storage/pageserver/data
|
||||
ExecReload=/bin/kill -HUP $MAINPID
|
||||
KillMode=mixed
|
||||
|
||||
2
.github/ansible/systemd/safekeeper.service
vendored
2
.github/ansible/systemd/safekeeper.service
vendored
@@ -5,7 +5,7 @@ After=network.target auditd.service
|
||||
[Service]
|
||||
Type=simple
|
||||
User=safekeeper
|
||||
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/lib
|
||||
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/v14/lib
|
||||
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="{{ env_name }}/wal"}'
|
||||
ExecReload=/bin/kill -HUP $MAINPID
|
||||
KillMode=mixed
|
||||
|
||||
41
.github/workflows/benchmarking.yml
vendored
41
.github/workflows/benchmarking.yml
vendored
@@ -46,7 +46,8 @@ jobs:
|
||||
runs-on: [self-hosted, zenith-benchmarker]
|
||||
|
||||
env:
|
||||
POSTGRES_DISTRIB_DIR: "/usr/pgsql-14"
|
||||
POSTGRES_DISTRIB_DIR: /tmp/pg_install
|
||||
DEFAULT_PG_VERSION: 14
|
||||
|
||||
steps:
|
||||
- name: Checkout zenith repo
|
||||
@@ -71,7 +72,7 @@ jobs:
|
||||
echo Poetry
|
||||
poetry --version
|
||||
echo Pgbench
|
||||
$POSTGRES_DISTRIB_DIR/bin/pgbench --version
|
||||
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
|
||||
|
||||
- name: Create Neon Project
|
||||
id: create-neon-project
|
||||
@@ -140,7 +141,8 @@ jobs:
|
||||
env:
|
||||
TEST_PG_BENCH_DURATIONS_MATRIX: "60m"
|
||||
TEST_PG_BENCH_SCALES_MATRIX: "10gb"
|
||||
POSTGRES_DISTRIB_DIR: /usr
|
||||
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
|
||||
DEFAULT_PG_VERSION: 14
|
||||
TEST_OUTPUT: /tmp/test_output
|
||||
BUILD_TYPE: remote
|
||||
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref == 'refs/heads/main' ) }}
|
||||
@@ -151,7 +153,7 @@ jobs:
|
||||
# neon-captest-new: Run pgbench in a freshly created project
|
||||
# neon-captest-reuse: Same, but reusing existing project
|
||||
# neon-captest-prefetch: Same, with prefetching enabled (new project)
|
||||
platform: [ neon-captest-new, neon-captest-reuse, neon-captest-prefetch, rds-aurora ]
|
||||
platform: [ neon-captest-prefetch ]
|
||||
|
||||
runs-on: dev
|
||||
container:
|
||||
@@ -163,10 +165,17 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
- name: Install Deps
|
||||
- name: Download Neon artifact
|
||||
uses: ./.github/actions/download
|
||||
with:
|
||||
name: neon-${{ runner.os }}-release-artifact
|
||||
path: /tmp/neon/
|
||||
prefix: latest
|
||||
|
||||
- name: Add Postgres binaries to PATH
|
||||
run: |
|
||||
sudo apt -y update
|
||||
sudo apt install -y postgresql-14
|
||||
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
|
||||
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
|
||||
|
||||
- name: Create Neon Project
|
||||
if: matrix.platform != 'neon-captest-reuse'
|
||||
@@ -204,8 +213,24 @@ jobs:
|
||||
- name: Set database options
|
||||
if: matrix.platform == 'neon-captest-prefetch'
|
||||
run: |
|
||||
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET geqo_seed=1.0"
|
||||
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET geqo_effort=10"
|
||||
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET seqscan_prefetch_buffers=647"
|
||||
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET default_statistics_target=1"
|
||||
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET jit_above_cost=-1.0"
|
||||
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET join_collapse_limit=1"
|
||||
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET geqo_selection_bias=2.0"
|
||||
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET geqo_pool_size=1574739104"
|
||||
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET geqo_generations=2147483647"
|
||||
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET enable_memoize=off"
|
||||
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET jit_inline_above_cost=-1.0"
|
||||
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET min_parallel_index_scan_size=0"
|
||||
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET geqo_threshold=2"
|
||||
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET enable_seqscan_prefetch=on"
|
||||
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET seqscan_prefetch_buffers=10"
|
||||
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET jit=off"
|
||||
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET jit_optimize_above_cost=1.79769e+308"
|
||||
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET geqo=on"
|
||||
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET effective_cache_size=872767456"
|
||||
env:
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
|
||||
|
||||
42
.github/workflows/build_and_test.yml
vendored
42
.github/workflows/build_and_test.yml
vendored
@@ -268,6 +268,32 @@ jobs:
|
||||
if: matrix.build_type == 'debug'
|
||||
uses: ./.github/actions/save-coverage-data
|
||||
|
||||
upload-latest-artifacts:
|
||||
runs-on: dev
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
|
||||
options: --init
|
||||
needs: [ regress-tests ]
|
||||
if: github.ref_name == 'main'
|
||||
steps:
|
||||
- name: Copy Neon artifact to the latest directory
|
||||
shell: bash -euxo pipefail {0}
|
||||
env:
|
||||
BUCKET: neon-github-public-dev
|
||||
PREFIX: artifacts/${{ github.run_id }}
|
||||
run: |
|
||||
for build_type in debug release; do
|
||||
FILENAME=neon-${{ runner.os }}-${build_type}-artifact.tar.zst
|
||||
|
||||
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${PREFIX} | jq -r '.Contents[].Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
|
||||
if [ -z "${S3_KEY}" ]; then
|
||||
echo 2>&1 "Neither s3://${BUCKET}/${PREFIX}/${FILENAME} nor its version from previous attempts exist"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
time aws s3 cp --only-show-errors s3://${BUCKET}/${S3_KEY} s3://${BUCKET}/artifacts/latest/${FILENAME}
|
||||
done
|
||||
|
||||
benchmarks:
|
||||
runs-on: dev
|
||||
container:
|
||||
@@ -335,9 +361,6 @@ jobs:
|
||||
curl --fail --output suites.json ${REPORT_URL%/index.html}/data/suites.json
|
||||
./scripts/pysync
|
||||
|
||||
# Workaround for https://github.com/neondatabase/cloud/issues/2188
|
||||
psql "$TEST_RESULT_CONNSTR" -c "SELECT 1;" || sleep 10
|
||||
|
||||
DATABASE_URL="$TEST_RESULT_CONNSTR" poetry run python3 scripts/ingest_regress_test_result.py --revision ${SHA} --reference ${GITHUB_REF} --build-type ${BUILD_TYPE} --ingest suites.json
|
||||
|
||||
coverage-report:
|
||||
@@ -588,7 +611,16 @@ jobs:
|
||||
- name: Pull rust image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned rust
|
||||
|
||||
- name: Configure docker login
|
||||
- name: Push images to production ECR
|
||||
if: |
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
run: |
|
||||
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/neon:latest
|
||||
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-tools:latest
|
||||
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-node:latest
|
||||
|
||||
- name: Configure Docker Hub login
|
||||
run: |
|
||||
# ECR Credential Helper & Docker Hub don't work together in config, hence reset
|
||||
echo "" > /github/home/.docker/config.json
|
||||
@@ -609,7 +641,7 @@ jobs:
|
||||
- name: Push rust image to Docker Hub
|
||||
run: crane push rust neondatabase/rust:pinned
|
||||
|
||||
- name: Add latest tag to images
|
||||
- name: Add latest tag to images in Docker Hub
|
||||
if: |
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
|
||||
@@ -19,9 +19,8 @@ COPY --chown=nonroot scripts/ninstall.sh scripts/ninstall.sh
|
||||
ENV BUILD_TYPE release
|
||||
RUN set -e \
|
||||
&& mold -run make -j $(nproc) -s neon-pg-ext \
|
||||
&& rm -rf pg_install/v14/build \
|
||||
&& rm -rf pg_install/v15/build \
|
||||
&& tar -C pg_install/v14 -czf /home/nonroot/postgres_install.tar.gz .
|
||||
&& rm -rf pg_install/build \
|
||||
&& tar -C pg_install -czf /home/nonroot/postgres_install.tar.gz .
|
||||
|
||||
# Build neon binaries
|
||||
FROM $REPOSITORY/$IMAGE:$TAG AS build
|
||||
|
||||
@@ -8,9 +8,12 @@ ARG TAG=pinned
|
||||
# Layer "build-deps"
|
||||
#
|
||||
FROM debian:bullseye-slim AS build-deps
|
||||
RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
|
||||
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
|
||||
apt update
|
||||
RUN apt update && \
|
||||
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev zlib1g-dev libxml2-dev \
|
||||
libcurl4-openssl-dev libossp-uuid-dev
|
||||
libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libglib2.0-dev
|
||||
|
||||
#
|
||||
# Layer "pg-build"
|
||||
@@ -37,7 +40,7 @@ RUN cd postgres && \
|
||||
FROM build-deps AS postgis-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
RUN apt update && \
|
||||
apt install -y gdal-bin libgdal-dev libprotobuf-c-dev protobuf-c-compiler xsltproc wget
|
||||
apt install -y gdal-bin libgdal-dev libprotobuf-c-dev protobuf-c-compiler xsltproc
|
||||
|
||||
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.0.tar.gz && \
|
||||
tar xvzf postgis-3.3.0.tar.gz && \
|
||||
@@ -59,15 +62,13 @@ RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.0.tar.gz && \
|
||||
# Build plv8
|
||||
#
|
||||
FROM build-deps AS plv8-build
|
||||
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
RUN apt update && \
|
||||
apt install -y git curl wget make ninja-build build-essential libncurses5 python3-dev pkg-config libc++-dev libc++abi-dev libglib2.0-dev
|
||||
apt install -y ninja-build python3-dev libc++-dev libc++abi-dev libncurses5
|
||||
|
||||
# https://github.com/plv8/plv8/issues/475
|
||||
# Debian bullseye provides binutils 2.35 when >= 2.38 is necessary
|
||||
RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
|
||||
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
|
||||
apt update && \
|
||||
RUN apt update && \
|
||||
apt install -y --no-install-recommends -t testing binutils
|
||||
|
||||
RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
|
||||
@@ -79,12 +80,46 @@ RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
|
||||
rm -rf /plv8-* && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/plv8.control
|
||||
|
||||
#
|
||||
# Layer "h3-pg-build"
|
||||
# Build h3_pg
|
||||
#
|
||||
FROM build-deps AS h3-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
# packaged cmake is too old
|
||||
RUN apt update && \
|
||||
apt install -y --no-install-recommends -t testing cmake
|
||||
|
||||
RUN wget https://github.com/uber/h3/archive/refs/tags/v4.0.1.tar.gz -O h3.tgz && \
|
||||
tar xvzf h3.tgz && \
|
||||
cd h3-4.0.1 && \
|
||||
mkdir build && \
|
||||
cd build && \
|
||||
cmake .. -DCMAKE_BUILD_TYPE=Release && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
DESTDIR=/h3 make install && \
|
||||
cp -R /h3/usr / && \
|
||||
rm -rf build
|
||||
|
||||
RUN wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.0.1.tar.gz -O h3-pg.tgz && \
|
||||
tar xvzf h3-pg.tgz && \
|
||||
cd h3-pg-4.0.1 && \
|
||||
export PATH="/usr/local/pgsql/bin:$PATH" && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/h3.control
|
||||
|
||||
#
|
||||
# Layer "neon-pg-ext-build"
|
||||
# compile neon extensions
|
||||
#
|
||||
FROM build-deps AS neon-pg-ext-build
|
||||
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
# plv8 still sometimes crashes during the creation
|
||||
# COPY --from=plv8-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=h3-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=h3-pg-build /h3/usr /
|
||||
COPY pgxn/ pgxn/
|
||||
|
||||
RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
@@ -132,8 +167,6 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
|
||||
chmod 0750 /var/db/postgres/compute && \
|
||||
echo '/usr/local/lib' >> /etc/ld.so.conf && /sbin/ldconfig
|
||||
|
||||
# TODO: Check if we can make the extension setup more modular versus a linear build
|
||||
# currently plv8-build copies the output /usr/local/pgsql from postgis-build, etc#
|
||||
COPY --from=postgres-cleanup-layer --chown=postgres /usr/local/pgsql /usr/local
|
||||
COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl
|
||||
|
||||
|
||||
@@ -170,7 +170,7 @@ pub fn find_end_of_wal(
|
||||
let mut curr_lsn = start_lsn;
|
||||
let mut buf = [0u8; XLOG_BLCKSZ];
|
||||
let pg_version = PG_MAJORVERSION[1..3].parse::<u32>().unwrap();
|
||||
info!("find_end_of_wal PG_VERSION: {}", pg_version);
|
||||
debug!("find_end_of_wal PG_VERSION: {}", pg_version);
|
||||
|
||||
let mut decoder = WalStreamDecoder::new(start_lsn, pg_version);
|
||||
|
||||
@@ -182,7 +182,7 @@ pub fn find_end_of_wal(
|
||||
match open_wal_segment(&seg_file_path)? {
|
||||
None => {
|
||||
// no more segments
|
||||
info!(
|
||||
debug!(
|
||||
"find_end_of_wal reached end at {:?}, segment {:?} doesn't exist",
|
||||
result, seg_file_path
|
||||
);
|
||||
@@ -205,7 +205,7 @@ pub fn find_end_of_wal(
|
||||
match decoder.poll_decode() {
|
||||
Ok(Some(record)) => result = record.0,
|
||||
Err(e) => {
|
||||
info!(
|
||||
debug!(
|
||||
"find_end_of_wal reached end at {:?}, decode error: {:?}",
|
||||
result, e
|
||||
);
|
||||
|
||||
@@ -240,7 +240,6 @@ where
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::sync::Arc;
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
|
||||
impl MonotonicCounter<i32> for i32 {
|
||||
@@ -258,17 +257,19 @@ mod tests {
|
||||
let seq = Arc::new(SeqWait::new(0));
|
||||
let seq2 = Arc::clone(&seq);
|
||||
let seq3 = Arc::clone(&seq);
|
||||
tokio::task::spawn(async move {
|
||||
let jh1 = tokio::task::spawn(async move {
|
||||
seq2.wait_for(42).await.expect("wait_for 42");
|
||||
let old = seq2.advance(100);
|
||||
assert_eq!(old, 99);
|
||||
seq2.wait_for(999).await.expect_err("no 999");
|
||||
seq2.wait_for_timeout(999, Duration::from_millis(100))
|
||||
.await
|
||||
.expect_err("no 999");
|
||||
});
|
||||
tokio::task::spawn(async move {
|
||||
let jh2 = tokio::task::spawn(async move {
|
||||
seq3.wait_for(42).await.expect("wait_for 42");
|
||||
seq3.wait_for(0).await.expect("wait_for 0");
|
||||
});
|
||||
sleep(Duration::from_secs(1));
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
let old = seq.advance(99);
|
||||
assert_eq!(old, 0);
|
||||
seq.wait_for(100).await.expect("wait_for 100");
|
||||
@@ -277,6 +278,9 @@ mod tests {
|
||||
assert_eq!(seq.advance(98), 100);
|
||||
assert_eq!(seq.load(), 100);
|
||||
|
||||
jh1.await.unwrap();
|
||||
jh2.await.unwrap();
|
||||
|
||||
seq.shutdown();
|
||||
}
|
||||
|
||||
@@ -284,15 +288,18 @@ mod tests {
|
||||
async fn seqwait_timeout() {
|
||||
let seq = Arc::new(SeqWait::new(0));
|
||||
let seq2 = Arc::clone(&seq);
|
||||
tokio::task::spawn(async move {
|
||||
let jh = tokio::task::spawn(async move {
|
||||
let timeout = Duration::from_millis(1);
|
||||
let res = seq2.wait_for_timeout(42, timeout).await;
|
||||
assert_eq!(res, Err(SeqWaitError::Timeout));
|
||||
});
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
// This will attempt to wake, but nothing will happen
|
||||
// because the waiter already dropped its Receiver.
|
||||
let old = seq.advance(99);
|
||||
assert_eq!(old, 0)
|
||||
assert_eq!(old, 0);
|
||||
jh.await.unwrap();
|
||||
|
||||
seq.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
7
logfile
7
logfile
@@ -1,7 +0,0 @@
|
||||
2022-09-22 12:07:18.140 EEST [463605] LOG: starting PostgreSQL 15beta3 on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 9.4.0-1ubuntu1~20.04.1) 9.4.0, 64-bit
|
||||
2022-09-22 12:07:18.140 EEST [463605] LOG: listening on IPv4 address "127.0.0.1", port 15331
|
||||
2022-09-22 12:07:18.142 EEST [463605] LOG: listening on Unix socket "/tmp/.s.PGSQL.15331"
|
||||
2022-09-22 12:07:18.145 EEST [463608] LOG: database system was shut down at 2022-09-22 12:07:17 EEST
|
||||
2022-09-22 12:07:18.149 EEST [463605] LOG: database system is ready to accept connections
|
||||
2022-09-22 12:07:18.211 EEST [463605] LOG: received immediate shutdown request
|
||||
2022-09-22 12:07:18.218 EEST [463605] LOG: database system is shut down
|
||||
@@ -1,8 +1,9 @@
|
||||
use metrics::core::{AtomicU64, GenericCounter};
|
||||
use metrics::{
|
||||
register_histogram, register_histogram_vec, register_int_counter, register_int_counter_vec,
|
||||
register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec, Histogram, HistogramVec,
|
||||
IntCounter, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
|
||||
register_gauge_vec, register_histogram, register_histogram_vec, register_int_counter,
|
||||
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec,
|
||||
GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge,
|
||||
UIntGaugeVec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -204,12 +205,34 @@ pub static REMAINING_SYNC_ITEMS: Lazy<IntGauge> = Lazy::new(|| {
|
||||
.expect("failed to register pageserver remote storage remaining sync items int gauge")
|
||||
});
|
||||
|
||||
pub static IMAGE_SYNC_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
pub static IMAGE_SYNC_TIME: Lazy<GaugeVec> = Lazy::new(|| {
|
||||
register_gauge_vec!(
|
||||
"pageserver_remote_storage_image_sync_duration",
|
||||
"Time spent to synchronize (up/download) a whole pageserver image",
|
||||
&["tenant_id", "timeline_id"],
|
||||
)
|
||||
.expect("failed to register per-timeline pageserver image sync time vec")
|
||||
});
|
||||
|
||||
pub static IMAGE_SYNC_OPERATION_KINDS: &[&str] = &["upload", "download", "delete"];
|
||||
pub static IMAGE_SYNC_STATUS: &[&str] = &["success", "failure", "abort"];
|
||||
|
||||
pub static IMAGE_SYNC_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_remote_storage_image_sync_count",
|
||||
"Number of synchronization operations executed for pageserver images. \
|
||||
Grouped by tenant, timeline, operation_kind and status",
|
||||
&["tenant_id", "timeline_id", "operation_kind", "status"]
|
||||
)
|
||||
.expect("failed to register pageserver image sync count vec")
|
||||
});
|
||||
|
||||
pub static IMAGE_SYNC_TIME_HISTOGRAM: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"pageserver_remote_storage_image_sync_seconds",
|
||||
"Time took to synchronize (download or upload) a whole pageserver image. \
|
||||
Grouped by tenant and timeline ids, `operation_kind` (upload|download) and `status` (success|failure)",
|
||||
&["tenant_id", "timeline_id", "operation_kind", "status"],
|
||||
Grouped by operation_kind and status",
|
||||
&["operation_kind", "status"],
|
||||
vec![0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 3.0, 10.0, 20.0]
|
||||
)
|
||||
.expect("failed to register pageserver image sync time histogram vec")
|
||||
@@ -256,7 +279,7 @@ macro_rules! redo_histogram_time_buckets {
|
||||
() => {
|
||||
vec![
|
||||
0.000_005, 0.000_010, 0.000_025, 0.000_050, 0.000_100, 0.000_250, 0.000_500, 0.001_000,
|
||||
0.002_500, 0.005_000, 0.010_000, 0.025_000, 0.050_000,
|
||||
0.002_500, 0.005_000, 0.010_000, 0.025_000, 0.050_000, 0.100_000, 0.250_000,
|
||||
]
|
||||
};
|
||||
}
|
||||
@@ -411,6 +434,14 @@ impl Drop for TimelineMetrics {
|
||||
for op in SMGR_QUERY_TIME_OPERATIONS {
|
||||
let _ = SMGR_QUERY_TIME.remove_label_values(&[op, tenant_id, timeline_id]);
|
||||
}
|
||||
|
||||
for op in IMAGE_SYNC_OPERATION_KINDS {
|
||||
for status in IMAGE_SYNC_STATUS {
|
||||
let _ = IMAGE_SYNC_COUNT.remove_label_values(&[tenant_id, timeline_id, op, status]);
|
||||
}
|
||||
}
|
||||
|
||||
let _ = IMAGE_SYNC_TIME.remove_label_values(&[tenant_id, timeline_id]);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -667,7 +667,7 @@ impl PageServerHandler {
|
||||
Ok(lsn)
|
||||
}
|
||||
|
||||
#[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
|
||||
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
|
||||
async fn handle_get_rel_exists_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
@@ -684,7 +684,7 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
|
||||
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
|
||||
async fn handle_get_nblocks_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
@@ -701,7 +701,7 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(skip(timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
|
||||
#[instrument(skip(self, timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
|
||||
async fn handle_db_size_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
@@ -721,7 +721,7 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(skip(timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
|
||||
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
|
||||
async fn handle_get_page_at_lsn_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
@@ -1023,6 +1023,9 @@ impl postgres_backend_async::Handler for PageServerHandler {
|
||||
let params = params_raw.split(' ').collect::<Vec<_>>();
|
||||
ensure!(params.len() == 1, "invalid param number for config command");
|
||||
let tenant_id = TenantId::from_str(params[0])?;
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
|
||||
pgb.write_message(&BeMessage::RowDescription(&[
|
||||
RowDescriptor::int8_col(b"checkpoint_distance"),
|
||||
@@ -1067,14 +1070,14 @@ impl postgres_backend_async::Handler for PageServerHandler {
|
||||
let caps = re
|
||||
.captures(query_string)
|
||||
.with_context(|| format!("invalid get_lsn_by_timestamp: '{}'", query_string))?;
|
||||
|
||||
let tenant_id = TenantId::from_str(caps.get(1).unwrap().as_str())?;
|
||||
let timeline_id = TimelineId::from_str(caps.get(2).unwrap().as_str())?;
|
||||
let timeline = get_local_timeline(tenant_id, timeline_id)?;
|
||||
|
||||
let timestamp = humantime::parse_rfc3339(caps.get(3).unwrap().as_str())?;
|
||||
let timestamp_pg = to_pg_timestamp(timestamp);
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
let timeline = get_local_timeline(tenant_id, timeline_id)?;
|
||||
pgb.write_message(&BeMessage::RowDescription(&[RowDescriptor::text_col(
|
||||
b"lsn",
|
||||
)]))?;
|
||||
|
||||
@@ -178,6 +178,7 @@ use crate::{
|
||||
TenantTimelineValues,
|
||||
};
|
||||
|
||||
use crate::metrics::{IMAGE_SYNC_COUNT, IMAGE_SYNC_TIME_HISTOGRAM};
|
||||
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
||||
|
||||
use self::download::download_index_parts;
|
||||
@@ -835,7 +836,6 @@ async fn process_sync_task_batch(
|
||||
sync_id,
|
||||
upload_data,
|
||||
sync_start,
|
||||
"upload",
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -879,7 +879,6 @@ async fn process_sync_task_batch(
|
||||
sync_id,
|
||||
download_data,
|
||||
sync_start,
|
||||
"download",
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -911,7 +910,6 @@ async fn process_sync_task_batch(
|
||||
sync_id,
|
||||
delete_data,
|
||||
sync_start,
|
||||
"delete",
|
||||
)
|
||||
.instrument(info_span!("delete_timeline_data"))
|
||||
.await;
|
||||
@@ -948,8 +946,9 @@ async fn download_timeline_data(
|
||||
sync_id: TenantTimelineId,
|
||||
new_download_data: SyncData<LayersDownload>,
|
||||
sync_start: Instant,
|
||||
task_name: &str,
|
||||
) -> DownloadStatus {
|
||||
static TASK_NAME: &str = "download";
|
||||
|
||||
match download_timeline_layers(
|
||||
conf,
|
||||
storage,
|
||||
@@ -961,19 +960,19 @@ async fn download_timeline_data(
|
||||
.await
|
||||
{
|
||||
DownloadedTimeline::Abort => {
|
||||
register_sync_status(sync_id, sync_start, task_name, None);
|
||||
register_sync_status(sync_id, sync_start, TASK_NAME, None);
|
||||
if let Err(e) = index.write().await.set_awaits_download(&sync_id, false) {
|
||||
error!("Timeline {sync_id} was expected to be in the remote index after a download attempt, but it's absent: {e:?}");
|
||||
}
|
||||
}
|
||||
DownloadedTimeline::FailedAndRescheduled => {
|
||||
register_sync_status(sync_id, sync_start, task_name, Some(false));
|
||||
register_sync_status(sync_id, sync_start, TASK_NAME, Some(false));
|
||||
}
|
||||
DownloadedTimeline::Successful(mut download_data) => {
|
||||
match update_local_metadata(conf, sync_id, current_remote_timeline).await {
|
||||
Ok(()) => match index.write().await.set_awaits_download(&sync_id, false) {
|
||||
Ok(()) => {
|
||||
register_sync_status(sync_id, sync_start, task_name, Some(true));
|
||||
register_sync_status(sync_id, sync_start, TASK_NAME, Some(true));
|
||||
return DownloadStatus::Downloaded;
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -984,7 +983,7 @@ async fn download_timeline_data(
|
||||
error!("Failed to update local timeline metadata: {e:?}");
|
||||
download_data.retries += 1;
|
||||
sync_queue.push(sync_id, SyncTask::Download(download_data));
|
||||
register_sync_status(sync_id, sync_start, task_name, Some(false));
|
||||
register_sync_status(sync_id, sync_start, TASK_NAME, Some(false));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1060,8 +1059,9 @@ async fn delete_timeline_data(
|
||||
sync_id: TenantTimelineId,
|
||||
mut new_delete_data: SyncData<LayersDeletion>,
|
||||
sync_start: Instant,
|
||||
task_name: &str,
|
||||
) {
|
||||
static TASK_NAME: &str = "delete";
|
||||
|
||||
let timeline_delete = &mut new_delete_data.data;
|
||||
|
||||
if !timeline_delete.deletion_registered {
|
||||
@@ -1077,14 +1077,14 @@ async fn delete_timeline_data(
|
||||
error!("Failed to update remote timeline {sync_id}: {e:?}");
|
||||
new_delete_data.retries += 1;
|
||||
sync_queue.push(sync_id, SyncTask::Delete(new_delete_data));
|
||||
register_sync_status(sync_id, sync_start, task_name, Some(false));
|
||||
register_sync_status(sync_id, sync_start, TASK_NAME, Some(false));
|
||||
return;
|
||||
}
|
||||
}
|
||||
timeline_delete.deletion_registered = true;
|
||||
|
||||
let sync_status = delete_timeline_layers(storage, sync_queue, sync_id, new_delete_data).await;
|
||||
register_sync_status(sync_id, sync_start, task_name, Some(sync_status));
|
||||
register_sync_status(sync_id, sync_start, TASK_NAME, Some(sync_status));
|
||||
}
|
||||
|
||||
async fn read_metadata_file(metadata_path: &Path) -> anyhow::Result<TimelineMetadata> {
|
||||
@@ -1103,8 +1103,8 @@ async fn upload_timeline_data(
|
||||
sync_id: TenantTimelineId,
|
||||
new_upload_data: SyncData<LayersUpload>,
|
||||
sync_start: Instant,
|
||||
task_name: &str,
|
||||
) -> UploadStatus {
|
||||
static TASK_NAME: &str = "upload";
|
||||
let mut uploaded_data = match upload_timeline_layers(
|
||||
storage,
|
||||
sync_queue,
|
||||
@@ -1115,7 +1115,7 @@ async fn upload_timeline_data(
|
||||
.await
|
||||
{
|
||||
UploadedTimeline::FailedAndRescheduled(e) => {
|
||||
register_sync_status(sync_id, sync_start, task_name, Some(false));
|
||||
register_sync_status(sync_id, sync_start, TASK_NAME, Some(false));
|
||||
return UploadStatus::Failed(e);
|
||||
}
|
||||
UploadedTimeline::Successful(upload_data) => upload_data,
|
||||
@@ -1134,14 +1134,14 @@ async fn upload_timeline_data(
|
||||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
register_sync_status(sync_id, sync_start, task_name, Some(true));
|
||||
register_sync_status(sync_id, sync_start, TASK_NAME, Some(true));
|
||||
UploadStatus::Uploaded
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to update remote timeline {sync_id}: {e:?}");
|
||||
uploaded_data.retries += 1;
|
||||
sync_queue.push(sync_id, SyncTask::Upload(uploaded_data));
|
||||
register_sync_status(sync_id, sync_start, task_name, Some(false));
|
||||
register_sync_status(sync_id, sync_start, TASK_NAME, Some(false));
|
||||
UploadStatus::Failed(e)
|
||||
}
|
||||
}
|
||||
@@ -1391,16 +1391,22 @@ fn register_sync_status(
|
||||
|
||||
let tenant_id = sync_id.tenant_id.to_string();
|
||||
let timeline_id = sync_id.timeline_id.to_string();
|
||||
match sync_status {
|
||||
Some(true) => {
|
||||
IMAGE_SYNC_TIME.with_label_values(&[&tenant_id, &timeline_id, sync_name, "success"])
|
||||
}
|
||||
Some(false) => {
|
||||
IMAGE_SYNC_TIME.with_label_values(&[&tenant_id, &timeline_id, sync_name, "failure"])
|
||||
}
|
||||
None => return,
|
||||
}
|
||||
.observe(secs_elapsed)
|
||||
|
||||
let sync_status = match sync_status {
|
||||
Some(true) => "success",
|
||||
Some(false) => "failure",
|
||||
None => "abort",
|
||||
};
|
||||
|
||||
IMAGE_SYNC_TIME_HISTOGRAM
|
||||
.with_label_values(&[sync_name, sync_status])
|
||||
.observe(secs_elapsed);
|
||||
IMAGE_SYNC_TIME
|
||||
.with_label_values(&[&tenant_id, &timeline_id])
|
||||
.add(secs_elapsed);
|
||||
IMAGE_SYNC_COUNT
|
||||
.with_label_values(&[&tenant_id, &timeline_id, sync_name, sync_status])
|
||||
.inc();
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -17,7 +17,6 @@ use tracing::*;
|
||||
use utils::crashsafe_dir::path_with_suffix_extension;
|
||||
|
||||
use std::cmp::min;
|
||||
use std::collections::hash_map;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::BTreeSet;
|
||||
use std::collections::HashMap;
|
||||
@@ -246,12 +245,12 @@ impl Tenant {
|
||||
let ancestor_ancestor_lsn = ancestor_timeline.get_ancestor_lsn();
|
||||
if ancestor_ancestor_lsn > *lsn {
|
||||
// can we safely just branch from the ancestor instead?
|
||||
anyhow::bail!(
|
||||
"invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}",
|
||||
lsn,
|
||||
ancestor_timeline_id,
|
||||
ancestor_ancestor_lsn,
|
||||
);
|
||||
bail!(
|
||||
"invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}",
|
||||
lsn,
|
||||
ancestor_timeline_id,
|
||||
ancestor_ancestor_lsn,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -406,11 +405,11 @@ impl Tenant {
|
||||
.with_context(|| format!("Failed to initialize timeline {timeline_id}"))?;
|
||||
|
||||
match timelines_accessor.entry(timeline.timeline_id) {
|
||||
hash_map::Entry::Occupied(_) => anyhow::bail!(
|
||||
Entry::Occupied(_) => bail!(
|
||||
"Found freshly initialized timeline {} in the tenant map",
|
||||
timeline.timeline_id
|
||||
),
|
||||
hash_map::Entry::Vacant(v) => {
|
||||
Entry::Vacant(v) => {
|
||||
v.insert(timeline);
|
||||
}
|
||||
}
|
||||
@@ -768,7 +767,7 @@ impl Tenant {
|
||||
})
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to fsync on firts save for config {}",
|
||||
"Failed to fsync on first save for config {}",
|
||||
target_config_path.display()
|
||||
)
|
||||
})?;
|
||||
@@ -1091,11 +1090,11 @@ impl Tenant {
|
||||
})?;
|
||||
|
||||
match timelines.entry(new_timeline_id) {
|
||||
hash_map::Entry::Occupied(_) => anyhow::bail!(
|
||||
Entry::Occupied(_) => bail!(
|
||||
"Found freshly initialized timeline {} in the tenant map",
|
||||
new_timeline_id
|
||||
),
|
||||
hash_map::Entry::Vacant(v) => {
|
||||
Entry::Vacant(v) => {
|
||||
v.insert(Arc::clone(&new_timeline));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -343,7 +343,9 @@ impl Timeline {
|
||||
match cached_lsn.cmp(&lsn) {
|
||||
Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
|
||||
Ordering::Equal => return Ok(cached_img), // exact LSN match, return the image
|
||||
Ordering::Greater => panic!(), // the returned lsn should never be after the requested lsn
|
||||
Ordering::Greater => {
|
||||
unreachable!("the returned lsn should never be after the requested lsn")
|
||||
}
|
||||
}
|
||||
Some((cached_lsn, cached_img))
|
||||
}
|
||||
@@ -726,10 +728,10 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn layer_removal_guard(&self) -> Result<MutexGuard<()>, anyhow::Error> {
|
||||
pub fn layer_removal_guard(&self) -> anyhow::Result<MutexGuard<()>> {
|
||||
self.layer_removal_cs
|
||||
.try_lock()
|
||||
.map_err(|e| anyhow::anyhow!("cannot lock compaction critical section {e}"))
|
||||
.map_err(|e| anyhow!("cannot lock compaction critical section {e}"))
|
||||
}
|
||||
|
||||
/// Retrieve current logical size of the timeline.
|
||||
|
||||
@@ -31,7 +31,6 @@ use etcd_broker::Client;
|
||||
use itertools::Itertools;
|
||||
use once_cell::sync::OnceCell;
|
||||
use std::future::Future;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::watch;
|
||||
use tracing::*;
|
||||
use url::Url;
|
||||
@@ -88,37 +87,44 @@ pub fn is_etcd_client_initialized() -> bool {
|
||||
/// That may lead to certain events not being observed by the listener.
|
||||
#[derive(Debug)]
|
||||
pub struct TaskHandle<E> {
|
||||
events_receiver: watch::Receiver<TaskEvent<E>>,
|
||||
join_handle: Option<tokio::task::JoinHandle<anyhow::Result<()>>>,
|
||||
events_receiver: watch::Receiver<TaskStateUpdate<E>>,
|
||||
cancellation: watch::Sender<()>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum TaskEvent<E> {
|
||||
Update(TaskStateUpdate<E>),
|
||||
End(anyhow::Result<()>),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum TaskStateUpdate<E> {
|
||||
Init,
|
||||
Started,
|
||||
NewEvent(E),
|
||||
End,
|
||||
Progress(E),
|
||||
}
|
||||
|
||||
impl<E: Clone> TaskHandle<E> {
|
||||
/// Initializes the task, starting it immediately after the creation.
|
||||
pub fn spawn<Fut>(
|
||||
task: impl FnOnce(Arc<watch::Sender<TaskEvent<E>>>, watch::Receiver<()>) -> Fut + Send + 'static,
|
||||
task: impl FnOnce(watch::Sender<TaskStateUpdate<E>>, watch::Receiver<()>) -> Fut
|
||||
+ Send
|
||||
+ 'static,
|
||||
) -> Self
|
||||
where
|
||||
Fut: Future<Output = Result<(), String>> + Send,
|
||||
E: Sync + Send + 'static,
|
||||
Fut: Future<Output = anyhow::Result<()>> + Send,
|
||||
E: Send + Sync + 'static,
|
||||
{
|
||||
let (cancellation, cancellation_receiver) = watch::channel(());
|
||||
let (events_sender, events_receiver) = watch::channel(TaskEvent::Started);
|
||||
let events_sender = Arc::new(events_sender);
|
||||
let (events_sender, events_receiver) = watch::channel(TaskStateUpdate::Started);
|
||||
|
||||
let sender = Arc::clone(&events_sender);
|
||||
let _ = WALRECEIVER_RUNTIME.spawn(async move {
|
||||
events_sender.send(TaskEvent::Started).ok();
|
||||
task(sender, cancellation_receiver).await
|
||||
let join_handle = WALRECEIVER_RUNTIME.spawn(async move {
|
||||
events_sender.send(TaskStateUpdate::Started).ok();
|
||||
task(events_sender, cancellation_receiver).await
|
||||
});
|
||||
|
||||
TaskHandle {
|
||||
join_handle: Some(join_handle),
|
||||
events_receiver,
|
||||
cancellation,
|
||||
}
|
||||
@@ -126,15 +132,45 @@ impl<E: Clone> TaskHandle<E> {
|
||||
|
||||
async fn next_task_event(&mut self) -> TaskEvent<E> {
|
||||
match self.events_receiver.changed().await {
|
||||
Ok(()) => self.events_receiver.borrow().clone(),
|
||||
Err(_task_channel_part_dropped) => TaskEvent::End,
|
||||
Ok(()) => TaskEvent::Update((self.events_receiver.borrow()).clone()),
|
||||
Err(_task_channel_part_dropped) => {
|
||||
TaskEvent::End(match self.join_handle.take() {
|
||||
Some(jh) => {
|
||||
if !jh.is_finished() {
|
||||
warn!("sender is dropped while join handle is still alive");
|
||||
}
|
||||
|
||||
jh.await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to join task: {e}"))
|
||||
.and_then(|x| x)
|
||||
}
|
||||
None => {
|
||||
// Another option is to have an enum, join handle or result and give away the reference to it
|
||||
Err(anyhow::anyhow!("Task was joined more than once"))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Aborts current task, waiting for it to finish.
|
||||
pub async fn shutdown(mut self) {
|
||||
self.cancellation.send(()).ok();
|
||||
// wait until the sender is dropped
|
||||
while self.events_receiver.changed().await.is_ok() {}
|
||||
pub async fn shutdown(self) {
|
||||
match self.join_handle {
|
||||
Some(jh) => {
|
||||
self.cancellation.send(()).ok();
|
||||
match jh.await {
|
||||
Ok(Ok(())) => debug!("Shutdown success"),
|
||||
Ok(Err(e)) => error!("Shutdown task error: {e:?}"),
|
||||
Err(join_error) => {
|
||||
if join_error.is_cancelled() {
|
||||
error!("Shutdown task was cancelled");
|
||||
} else {
|
||||
error!("Shutdown task join error: {join_error}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,10 +16,10 @@ use std::{
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::task_mgr::WALRECEIVER_RUNTIME;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::{task_mgr, walreceiver::TaskStateUpdate};
|
||||
use anyhow::Context;
|
||||
use chrono::{NaiveDateTime, Utc};
|
||||
use etcd_broker::{
|
||||
@@ -145,19 +145,26 @@ async fn connection_manager_loop_step(
|
||||
let wal_connection = walreceiver_state.wal_connection.as_mut()
|
||||
.expect("Should have a connection, as checked by the corresponding select! guard");
|
||||
match wal_connection_update {
|
||||
TaskEvent::Started => {},
|
||||
TaskEvent::NewEvent(status) => {
|
||||
if status.has_processed_wal {
|
||||
// We have advanced last_record_lsn by processing the WAL received
|
||||
// from this safekeeper. This is good enough to clean unsuccessful
|
||||
// retries history and allow reconnecting to this safekeeper without
|
||||
// sleeping for a long time.
|
||||
walreceiver_state.wal_connection_retries.remove(&wal_connection.sk_id);
|
||||
TaskEvent::Update(c) => {
|
||||
match c {
|
||||
TaskStateUpdate::Init | TaskStateUpdate::Started => {},
|
||||
TaskStateUpdate::Progress(status) => {
|
||||
if status.has_processed_wal {
|
||||
// We have advanced last_record_lsn by processing the WAL received
|
||||
// from this safekeeper. This is good enough to clean unsuccessful
|
||||
// retries history and allow reconnecting to this safekeeper without
|
||||
// sleeping for a long time.
|
||||
walreceiver_state.wal_connection_retries.remove(&wal_connection.sk_id);
|
||||
}
|
||||
wal_connection.status = status.to_owned();
|
||||
}
|
||||
}
|
||||
wal_connection.status = status;
|
||||
},
|
||||
TaskEvent::End => {
|
||||
debug!("WAL receiving task finished");
|
||||
TaskEvent::End(walreceiver_task_result) => {
|
||||
match walreceiver_task_result {
|
||||
Ok(()) => debug!("WAL receiving task finished"),
|
||||
Err(e) => error!("wal receiver task finished with an error: {e:?}"),
|
||||
}
|
||||
walreceiver_state.drop_old_connection(false).await;
|
||||
},
|
||||
}
|
||||
@@ -363,13 +370,13 @@ impl WalreceiverState {
|
||||
async move {
|
||||
super::walreceiver_connection::handle_walreceiver_connection(
|
||||
timeline,
|
||||
&new_wal_source_connstr,
|
||||
events_sender.as_ref(),
|
||||
new_wal_source_connstr,
|
||||
events_sender,
|
||||
cancellation,
|
||||
connect_timeout,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| format!("walreceiver connection handling failure: {e:#}"))
|
||||
.context("walreceiver connection handling failure")
|
||||
}
|
||||
.instrument(info_span!("walreceiver_connection", id = %id))
|
||||
});
|
||||
@@ -885,7 +892,7 @@ mod tests {
|
||||
status: connection_status.clone(),
|
||||
connection_task: TaskHandle::spawn(move |sender, _| async move {
|
||||
sender
|
||||
.send(TaskEvent::NewEvent(connection_status.clone()))
|
||||
.send(TaskStateUpdate::Progress(connection_status.clone()))
|
||||
.ok();
|
||||
Ok(())
|
||||
}),
|
||||
@@ -1145,7 +1152,7 @@ mod tests {
|
||||
status: connection_status.clone(),
|
||||
connection_task: TaskHandle::spawn(move |sender, _| async move {
|
||||
sender
|
||||
.send(TaskEvent::NewEvent(connection_status.clone()))
|
||||
.send(TaskStateUpdate::Progress(connection_status.clone()))
|
||||
.ok();
|
||||
Ok(())
|
||||
}),
|
||||
@@ -1233,7 +1240,7 @@ mod tests {
|
||||
status: connection_status.clone(),
|
||||
connection_task: TaskHandle::spawn(move |sender, _| async move {
|
||||
sender
|
||||
.send(TaskEvent::NewEvent(connection_status.clone()))
|
||||
.send(TaskStateUpdate::Progress(connection_status.clone()))
|
||||
.ok();
|
||||
Ok(())
|
||||
}),
|
||||
|
||||
@@ -16,10 +16,9 @@ use postgres_protocol::message::backend::ReplicationMessage;
|
||||
use postgres_types::PgLsn;
|
||||
use tokio::{pin, select, sync::watch, time};
|
||||
use tokio_postgres::{replication::ReplicationStream, Client};
|
||||
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
|
||||
use super::TaskEvent;
|
||||
use crate::metrics::LIVE_CONNECTIONS_COUNT;
|
||||
use crate::{metrics::LIVE_CONNECTIONS_COUNT, walreceiver::TaskStateUpdate};
|
||||
use crate::{
|
||||
task_mgr,
|
||||
task_mgr::TaskKind,
|
||||
@@ -55,8 +54,8 @@ pub struct WalConnectionStatus {
|
||||
/// messages as we go.
|
||||
pub async fn handle_walreceiver_connection(
|
||||
timeline: Arc<Timeline>,
|
||||
wal_source_connstr: &str,
|
||||
events_sender: &watch::Sender<TaskEvent<WalConnectionStatus>>,
|
||||
wal_source_connstr: String,
|
||||
events_sender: watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
|
||||
mut cancellation: watch::Receiver<()>,
|
||||
connect_timeout: Duration,
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -81,7 +80,7 @@ pub async fn handle_walreceiver_connection(
|
||||
streaming_lsn: None,
|
||||
commit_lsn: None,
|
||||
};
|
||||
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
|
||||
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone())) {
|
||||
warn!("Wal connection event listener dropped right after connection init, aborting the connection: {e}");
|
||||
return Ok(());
|
||||
}
|
||||
@@ -112,8 +111,7 @@ pub async fn handle_walreceiver_connection(
|
||||
_ = connection_cancellation.changed() => info!("Connection cancelled"),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
.instrument(info_span!("walreceiver connection")),
|
||||
},
|
||||
);
|
||||
|
||||
// Immediately increment the gauge, then create a job to decrement it on task exit.
|
||||
@@ -134,7 +132,7 @@ pub async fn handle_walreceiver_connection(
|
||||
connection_status.latest_connection_update = Utc::now().naive_utc();
|
||||
connection_status.latest_wal_update = Utc::now().naive_utc();
|
||||
connection_status.commit_lsn = Some(end_of_wal);
|
||||
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
|
||||
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone())) {
|
||||
warn!("Wal connection event listener dropped after IDENTIFY_SYSTEM, aborting the connection: {e}");
|
||||
return Ok(());
|
||||
}
|
||||
@@ -202,7 +200,7 @@ pub async fn handle_walreceiver_connection(
|
||||
}
|
||||
&_ => {}
|
||||
};
|
||||
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
|
||||
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone())) {
|
||||
warn!("Wal connection event listener dropped, aborting the connection: {e}");
|
||||
return Ok(());
|
||||
}
|
||||
@@ -268,7 +266,8 @@ pub async fn handle_walreceiver_connection(
|
||||
if !connection_status.has_processed_wal && last_rec_lsn > last_rec_lsn_before_msg {
|
||||
// We have successfully processed at least one WAL record.
|
||||
connection_status.has_processed_wal = true;
|
||||
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
|
||||
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone()))
|
||||
{
|
||||
warn!("Wal connection event listener dropped, aborting the connection: {e}");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -259,3 +259,15 @@ fn parse_host_port(input: &str) -> Option<(&str, u16)> {
|
||||
let (host, port) = input.split_once(':')?;
|
||||
Some((host, port.parse().ok()?))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_parse_host_port() {
|
||||
let (host, port) = parse_host_port("127.0.0.1:5432").expect("failed to parse");
|
||||
assert_eq!(host, "127.0.0.1");
|
||||
assert_eq!(port, 5432);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,13 +54,10 @@ impl<'a> ClientCredentials<'a> {
|
||||
let dbname = get_param("database")?;
|
||||
|
||||
// Project name might be passed via PG's command-line options.
|
||||
let project_a = params.options_raw().and_then(|options| {
|
||||
for opt in options {
|
||||
if let Some(value) = opt.strip_prefix("project=") {
|
||||
return Some(Cow::Borrowed(value));
|
||||
}
|
||||
}
|
||||
None
|
||||
let project_a = params.options_raw().and_then(|mut options| {
|
||||
options
|
||||
.find_map(|opt| opt.strip_prefix("project="))
|
||||
.map(Cow::Borrowed)
|
||||
});
|
||||
|
||||
// Alternative project name is in fact a subdomain from SNI.
|
||||
|
||||
@@ -52,6 +52,16 @@ impl CancelMap {
|
||||
let session = Session::new(key, self);
|
||||
f(session).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn contains(&self, session: &Session) -> bool {
|
||||
self.0.lock().contains_key(&session.key)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn is_empty(&self) -> bool {
|
||||
self.0.lock().is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
/// This should've been a [`std::future::Future`], but
|
||||
@@ -104,3 +114,39 @@ impl<'a> Session<'a> {
|
||||
self.key
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
#[tokio::test]
|
||||
async fn check_session_drop() -> anyhow::Result<()> {
|
||||
static CANCEL_MAP: Lazy<CancelMap> = Lazy::new(Default::default);
|
||||
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
let task = tokio::spawn(CANCEL_MAP.with_session(|session| async move {
|
||||
assert!(CANCEL_MAP.contains(&session));
|
||||
|
||||
tx.send(()).expect("failed to send");
|
||||
let () = futures::future::pending().await; // sleep forever
|
||||
|
||||
Ok(())
|
||||
}));
|
||||
|
||||
// Wait until the task has been spawned.
|
||||
let () = rx.await.context("failed to hear from the task")?;
|
||||
|
||||
// Drop the session's entry by cancelling the task.
|
||||
task.abort();
|
||||
let error = task.await.expect_err("task should have failed");
|
||||
if !error.is_cancelled() {
|
||||
anyhow::bail!(error);
|
||||
}
|
||||
|
||||
// Check that the session has been dropped.
|
||||
assert!(CANCEL_MAP.is_empty());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
//! Small parsing helpers.
|
||||
|
||||
use std::convert::TryInto;
|
||||
use std::ffi::CStr;
|
||||
|
||||
pub fn split_cstr(bytes: &[u8]) -> Option<(&CStr, &[u8])> {
|
||||
@@ -10,9 +9,36 @@ pub fn split_cstr(bytes: &[u8]) -> Option<(&CStr, &[u8])> {
|
||||
Some((unsafe { CStr::from_bytes_with_nul_unchecked(cstr) }, other))
|
||||
}
|
||||
|
||||
/// See <https://doc.rust-lang.org/std/primitive.slice.html#method.split_array_ref>.
|
||||
pub fn split_at_const<const N: usize>(bytes: &[u8]) -> Option<(&[u8; N], &[u8])> {
|
||||
(bytes.len() >= N).then(|| {
|
||||
let (head, tail) = bytes.split_at(N);
|
||||
(head.try_into().unwrap(), tail)
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_split_cstr() {
|
||||
assert!(split_cstr(b"").is_none());
|
||||
assert!(split_cstr(b"foo").is_none());
|
||||
|
||||
let (cstr, rest) = split_cstr(b"\0").expect("uh-oh");
|
||||
assert_eq!(cstr.to_bytes(), b"");
|
||||
assert_eq!(rest, b"");
|
||||
|
||||
let (cstr, rest) = split_cstr(b"foo\0bar").expect("uh-oh");
|
||||
assert_eq!(cstr.to_bytes(), b"foo");
|
||||
assert_eq!(rest, b"bar");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_split_at_const() {
|
||||
assert!(split_at_const::<0>(b"").is_some());
|
||||
assert!(split_at_const::<1>(b"").is_none());
|
||||
assert!(matches!(split_at_const::<1>(b"ok"), Some((b"o", b"k"))));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -248,6 +248,18 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
|
||||
oldstate.timeline_start_lsn = Lsn(1);
|
||||
oldstate.local_start_lsn = Lsn(1);
|
||||
|
||||
return Ok(oldstate);
|
||||
} else if version == 6 {
|
||||
info!("reading safekeeper control file version {}", version);
|
||||
let mut oldstate = SafeKeeperState::des(&buf[..buf.len()])?;
|
||||
if oldstate.server.pg_version != 0 {
|
||||
return Ok(oldstate);
|
||||
}
|
||||
|
||||
// set pg_version to the default v14
|
||||
info!("setting pg_version to 140005");
|
||||
oldstate.server.pg_version = 140005;
|
||||
|
||||
return Ok(oldstate);
|
||||
}
|
||||
bail!("unsupported safekeeper control file version {}", version)
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
use std::time::{Instant, SystemTime};
|
||||
|
||||
use ::metrics::{register_histogram, GaugeVec, Histogram, DISK_WRITE_SECONDS_BUCKETS};
|
||||
use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS};
|
||||
use anyhow::Result;
|
||||
use metrics::{
|
||||
core::{AtomicU64, Collector, Desc, GenericGaugeVec, Opts},
|
||||
@@ -135,6 +135,7 @@ pub struct TimelineCollector {
|
||||
written_wal_seconds: GaugeVec,
|
||||
flushed_wal_seconds: GaugeVec,
|
||||
collect_timeline_metrics: Gauge,
|
||||
timelines_count: IntGauge,
|
||||
}
|
||||
|
||||
impl Default for TimelineCollector {
|
||||
@@ -311,6 +312,13 @@ impl TimelineCollector {
|
||||
.unwrap();
|
||||
descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
|
||||
|
||||
let timelines_count = IntGauge::new(
|
||||
"safekeeper_timelines",
|
||||
"Total number of timelines loaded in-memory",
|
||||
)
|
||||
.unwrap();
|
||||
descs.extend(timelines_count.desc().into_iter().cloned());
|
||||
|
||||
TimelineCollector {
|
||||
descs,
|
||||
commit_lsn,
|
||||
@@ -330,6 +338,7 @@ impl TimelineCollector {
|
||||
written_wal_seconds,
|
||||
flushed_wal_seconds,
|
||||
collect_timeline_metrics,
|
||||
timelines_count,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -361,6 +370,7 @@ impl Collector for TimelineCollector {
|
||||
self.flushed_wal_seconds.reset();
|
||||
|
||||
let timelines = GlobalTimelines::get_all();
|
||||
let timelines_count = timelines.len();
|
||||
|
||||
for arc_tli in timelines {
|
||||
let tli = arc_tli.info_for_metrics();
|
||||
@@ -474,6 +484,10 @@ impl Collector for TimelineCollector {
|
||||
self.collect_timeline_metrics.set(elapsed);
|
||||
mfs.extend(self.collect_timeline_metrics.collect());
|
||||
|
||||
// report total number of timelines
|
||||
self.timelines_count.set(timelines_count as i64);
|
||||
mfs.extend(self.timelines_count.collect());
|
||||
|
||||
mfs
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ use utils::{
|
||||
};
|
||||
|
||||
pub const SK_MAGIC: u32 = 0xcafeceefu32;
|
||||
pub const SK_FORMAT_VERSION: u32 = 6;
|
||||
pub const SK_FORMAT_VERSION: u32 = 7;
|
||||
const SK_PROTOCOL_VERSION: u32 = 2;
|
||||
pub const UNKNOWN_SERVER_VERSION: u32 = 0;
|
||||
|
||||
@@ -639,7 +639,6 @@ where
|
||||
|
||||
let mut state = self.state.clone();
|
||||
state.server.system_id = msg.system_id;
|
||||
state.server.wal_seg_size = msg.wal_seg_size;
|
||||
if msg.pg_version != UNKNOWN_SERVER_VERSION {
|
||||
state.server.pg_version = msg.pg_version;
|
||||
}
|
||||
@@ -830,10 +829,6 @@ where
|
||||
self.epoch_start_lsn = msg.h.epoch_start_lsn;
|
||||
self.inmem.proposer_uuid = msg.h.proposer_uuid;
|
||||
|
||||
// bootstrap the decoder, if not yet
|
||||
self.wal_store
|
||||
.init_decoder(self.state.server.pg_version / 10000, self.state.commit_lsn)?;
|
||||
|
||||
// do the job
|
||||
if !msg.wal_data.is_empty() {
|
||||
self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?;
|
||||
@@ -990,10 +985,6 @@ mod tests {
|
||||
}
|
||||
|
||||
impl wal_storage::Storage for DummyWalStore {
|
||||
fn init_decoder(&mut self, _pg_majorversion: u32, _commit_lsn: Lsn) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn flush_lsn(&self) -> Lsn {
|
||||
self.lsn
|
||||
}
|
||||
|
||||
@@ -314,6 +314,8 @@ impl Timeline {
|
||||
ttid: TenantTimelineId,
|
||||
wal_backup_launcher_tx: Sender<TenantTimelineId>,
|
||||
) -> Result<Timeline> {
|
||||
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
|
||||
|
||||
let shared_state = SharedState::restore(&conf, &ttid)?;
|
||||
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
|
||||
watch::channel(shared_state.sk.state.commit_lsn);
|
||||
|
||||
@@ -38,8 +38,6 @@ use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use tokio::io::{AsyncReadExt, AsyncSeekExt};
|
||||
|
||||
pub trait Storage {
|
||||
// Bootstrap the wal decoder with correct pg_version
|
||||
fn init_decoder(&mut self, pg_majorversion: u32, commit_lsn: Lsn) -> Result<()>;
|
||||
/// LSN of last durably stored WAL record.
|
||||
fn flush_lsn(&self) -> Lsn;
|
||||
|
||||
@@ -89,8 +87,7 @@ pub struct PhysicalStorage {
|
||||
flush_record_lsn: Lsn,
|
||||
|
||||
/// Decoder is required for detecting boundaries of WAL records.
|
||||
/// None until it is initialized
|
||||
decoder: Option<WalStreamDecoder>,
|
||||
decoder: WalStreamDecoder,
|
||||
|
||||
/// Cached open file for the last segment.
|
||||
///
|
||||
@@ -114,19 +111,33 @@ impl PhysicalStorage {
|
||||
|
||||
// Find out where stored WAL ends, starting at commit_lsn which is a
|
||||
// known recent record boundary (unless we don't have WAL at all).
|
||||
//
|
||||
// NB: find_end_of_wal MUST be backwards compatible with the previously
|
||||
// written WAL. If find_end_of_wal fails to read any WAL written by an
|
||||
// older version of the code, we could lose data forever.
|
||||
let write_lsn = if state.commit_lsn == Lsn(0) {
|
||||
Lsn(0)
|
||||
} else {
|
||||
// FIXME What would be the correct value here, if we can not
|
||||
// call find_end_of_wal yet, because we don't know pg_version?
|
||||
state.commit_lsn
|
||||
match state.server.pg_version / 10000 {
|
||||
14 => postgres_ffi::v14::xlog_utils::find_end_of_wal(
|
||||
&timeline_dir,
|
||||
wal_seg_size,
|
||||
state.commit_lsn,
|
||||
)?,
|
||||
15 => postgres_ffi::v15::xlog_utils::find_end_of_wal(
|
||||
&timeline_dir,
|
||||
wal_seg_size,
|
||||
state.commit_lsn,
|
||||
)?,
|
||||
_ => bail!("unsupported postgres version: {}", state.server.pg_version),
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: do we really know that write_lsn is fully flushed to disk?
|
||||
// If not, maybe it's better to call fsync() here to be sure?
|
||||
let flush_lsn = write_lsn;
|
||||
|
||||
info!(
|
||||
debug!(
|
||||
"initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
|
||||
ttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
|
||||
);
|
||||
@@ -142,7 +153,7 @@ impl PhysicalStorage {
|
||||
write_lsn,
|
||||
write_record_lsn: write_lsn,
|
||||
flush_record_lsn: flush_lsn,
|
||||
decoder: None,
|
||||
decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000),
|
||||
file: None,
|
||||
})
|
||||
}
|
||||
@@ -257,42 +268,6 @@ impl PhysicalStorage {
|
||||
}
|
||||
|
||||
impl Storage for PhysicalStorage {
|
||||
fn init_decoder(&mut self, pg_majorversion: u32, commit_lsn: Lsn) -> Result<()> {
|
||||
if self.decoder.is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
info!(
|
||||
"init_decoder for pg_version {} and commit_lsn {}",
|
||||
pg_majorversion, commit_lsn
|
||||
);
|
||||
|
||||
let write_lsn = match pg_majorversion {
|
||||
14 => postgres_ffi::v14::xlog_utils::find_end_of_wal(
|
||||
&self.timeline_dir,
|
||||
self.wal_seg_size,
|
||||
commit_lsn,
|
||||
)?,
|
||||
15 => postgres_ffi::v15::xlog_utils::find_end_of_wal(
|
||||
&self.timeline_dir,
|
||||
self.wal_seg_size,
|
||||
commit_lsn,
|
||||
)?,
|
||||
_ => bail!("unsupported postgres version"),
|
||||
};
|
||||
|
||||
info!(
|
||||
"init_decoder for pg_version {} and commit_lsn {}. write_lsn = {}",
|
||||
pg_majorversion, commit_lsn, write_lsn
|
||||
);
|
||||
|
||||
self.decoder = Some(WalStreamDecoder::new(write_lsn, pg_majorversion));
|
||||
self.flush_record_lsn = write_lsn;
|
||||
self.write_record_lsn = write_lsn;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// flush_lsn returns LSN of last durably stored WAL record.
|
||||
fn flush_lsn(&self) -> Lsn {
|
||||
self.flush_record_lsn
|
||||
@@ -324,18 +299,18 @@ impl Storage for PhysicalStorage {
|
||||
|
||||
// figure out last record's end lsn for reporting (if we got the
|
||||
// whole record)
|
||||
if self.decoder.as_ref().unwrap().available() != startpos {
|
||||
if self.decoder.available() != startpos {
|
||||
info!(
|
||||
"restart decoder from {} to {}",
|
||||
self.decoder.as_ref().unwrap().available(),
|
||||
self.decoder.available(),
|
||||
startpos,
|
||||
);
|
||||
let pg_version = self.decoder.as_ref().unwrap().pg_version;
|
||||
self.decoder = Some(WalStreamDecoder::new(startpos, pg_version));
|
||||
let pg_version = self.decoder.pg_version;
|
||||
self.decoder = WalStreamDecoder::new(startpos, pg_version);
|
||||
}
|
||||
self.decoder.as_mut().unwrap().feed_bytes(buf);
|
||||
self.decoder.feed_bytes(buf);
|
||||
loop {
|
||||
match self.decoder.as_mut().unwrap().poll_decode()? {
|
||||
match self.decoder.poll_decode()? {
|
||||
None => break, // no full record yet
|
||||
Some((lsn, _rec)) => {
|
||||
self.write_record_lsn = lsn;
|
||||
|
||||
@@ -710,8 +710,8 @@ if __name__ == "__main__":
|
||||
"--psql-path",
|
||||
dest="psql_path",
|
||||
required=False,
|
||||
default="/usr/local/bin/psql",
|
||||
help="Path to the psql binary. Default: /usr/local/bin/psql",
|
||||
default="/usr/local/v14/bin/psql",
|
||||
help="Path to the psql binary. Default: /usr/local/v14/bin/psql",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--only-import",
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import logging
|
||||
import logging.config
|
||||
import re
|
||||
|
||||
"""
|
||||
This file configures logging to use in python tests.
|
||||
@@ -30,17 +29,6 @@ LOGGING = {
|
||||
}
|
||||
|
||||
|
||||
class PasswordFilter(logging.Filter):
|
||||
"""Filter out password from logs."""
|
||||
|
||||
# Good enough to filter our passwords produced by PgProtocol.connstr
|
||||
FILTER = re.compile(r"(\s*)password=[^\s]+(\s*)")
|
||||
|
||||
def filter(self, record: logging.LogRecord) -> bool:
|
||||
record.msg = self.FILTER.sub(r"\1password=<hidden>\2", str(record.msg))
|
||||
return True
|
||||
|
||||
|
||||
def getLogger(name="root") -> logging.Logger:
|
||||
"""Method to get logger for tests.
|
||||
|
||||
@@ -50,6 +38,5 @@ def getLogger(name="root") -> logging.Logger:
|
||||
|
||||
# default logger for tests
|
||||
log = getLogger()
|
||||
log.addFilter(PasswordFilter())
|
||||
|
||||
logging.config.dictConfig(LOGGING)
|
||||
|
||||
@@ -283,10 +283,15 @@ class PgProtocol:
|
||||
return str(make_dsn(**self.conn_options(**kwargs)))
|
||||
|
||||
def conn_options(self, **kwargs):
|
||||
"""
|
||||
Construct a dictionary of connection options from default values and extra parameters.
|
||||
An option can be dropped from the returning dictionary by None-valued extra parameter.
|
||||
"""
|
||||
result = self.default_options.copy()
|
||||
if "dsn" in kwargs:
|
||||
result.update(parse_dsn(kwargs["dsn"]))
|
||||
result.update(kwargs)
|
||||
result = {k: v for k, v in result.items() if v is not None}
|
||||
|
||||
# Individual statement timeout in seconds. 2 minutes should be
|
||||
# enough for our tests, but if you need a longer, you can
|
||||
|
||||
@@ -4,7 +4,7 @@ import os
|
||||
import timeit
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
from typing import Dict, List
|
||||
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport, PgBenchInitResult, PgBenchRunResult
|
||||
@@ -24,14 +24,18 @@ def utc_now_timestamp() -> int:
|
||||
return calendar.timegm(datetime.utcnow().utctimetuple())
|
||||
|
||||
|
||||
def init_pgbench(env: PgCompare, cmdline):
|
||||
def init_pgbench(env: PgCompare, cmdline, password: None):
|
||||
environ: Dict[str, str] = {}
|
||||
if password is not None:
|
||||
environ["PGPASSWORD"] = password
|
||||
|
||||
# calculate timestamps and durations separately
|
||||
# timestamp is intended to be used for linking to grafana and logs
|
||||
# duration is actually a metric and uses float instead of int for timestamp
|
||||
start_timestamp = utc_now_timestamp()
|
||||
t0 = timeit.default_timer()
|
||||
with env.record_pageserver_writes("init.pageserver_writes"):
|
||||
out = env.pg_bin.run_capture(cmdline)
|
||||
out = env.pg_bin.run_capture(cmdline, env=environ)
|
||||
env.flush()
|
||||
|
||||
duration = timeit.default_timer() - t0
|
||||
@@ -48,13 +52,15 @@ def init_pgbench(env: PgCompare, cmdline):
|
||||
env.zenbenchmark.record_pg_bench_init_result("init", res)
|
||||
|
||||
|
||||
def run_pgbench(env: PgCompare, prefix: str, cmdline):
|
||||
def run_pgbench(env: PgCompare, prefix: str, cmdline, password: None):
|
||||
environ: Dict[str, str] = {}
|
||||
if password is not None:
|
||||
environ["PGPASSWORD"] = password
|
||||
|
||||
with env.record_pageserver_writes(f"{prefix}.pageserver_writes"):
|
||||
run_start_timestamp = utc_now_timestamp()
|
||||
t0 = timeit.default_timer()
|
||||
out = env.pg_bin.run_capture(
|
||||
cmdline,
|
||||
)
|
||||
out = env.pg_bin.run_capture(cmdline, env=environ)
|
||||
run_duration = timeit.default_timer() - t0
|
||||
run_end_timestamp = utc_now_timestamp()
|
||||
env.flush()
|
||||
@@ -82,10 +88,14 @@ def run_pgbench(env: PgCompare, prefix: str, cmdline):
|
||||
def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: PgBenchLoadType):
|
||||
env.zenbenchmark.record("scale", scale, "", MetricReport.TEST_PARAM)
|
||||
|
||||
password = env.pg.default_options.get("password", None)
|
||||
options = "-cstatement_timeout=1h " + env.pg.default_options.get("options", "")
|
||||
# drop password from the connection string by passing password=None and set password separately
|
||||
connstr = env.pg.connstr(password=None, options=options)
|
||||
|
||||
if workload_type == PgBenchLoadType.INIT:
|
||||
# Run initialize
|
||||
options = "-cstatement_timeout=1h " + env.pg.default_options.get("options", "")
|
||||
init_pgbench(env, ["pgbench", f"-s{scale}", "-i", env.pg.connstr(options=options)])
|
||||
init_pgbench(env, ["pgbench", f"-s{scale}", "-i", connstr], password=password)
|
||||
|
||||
if workload_type == PgBenchLoadType.SIMPLE_UPDATE:
|
||||
# Run simple-update workload
|
||||
@@ -99,8 +109,9 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
|
||||
f"-T{duration}",
|
||||
"-P2",
|
||||
"--progress-timestamp",
|
||||
env.pg.connstr(),
|
||||
connstr,
|
||||
],
|
||||
password=password,
|
||||
)
|
||||
|
||||
if workload_type == PgBenchLoadType.SELECT_ONLY:
|
||||
@@ -115,8 +126,9 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
|
||||
f"-T{duration}",
|
||||
"-P2",
|
||||
"--progress-timestamp",
|
||||
env.pg.connstr(),
|
||||
connstr,
|
||||
],
|
||||
password=password,
|
||||
)
|
||||
|
||||
env.report_size()
|
||||
|
||||
Reference in New Issue
Block a user