Compare commits

...

5 Commits

Author SHA1 Message Date
Conrad Ludgate
5019c7eb2c fix: dont hard error on initial redis failure 2025-01-30 10:26:48 +00:00
Alex Chi Z.
9dff6cc2a4 fix(pageserver): skip repartition if we need L0 compaction (#10547)
## Problem

Repartition is slow, but it's only used in image layer creation. We can
skip it if we have a lot of L0 layers to ingest.

## Summary of changes

If L0 compaction is not complete, do not repartition and do not create
image layers.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-01-29 21:32:50 +00:00
Erik Grinaker
ff298afb97 pageserver: add level for timeline layer metrics (#10563)
## Problem

We don't have good observability for per-timeline compaction debt,
specifically the number of delta layers in the frozen, L0, and L1
levels.

Touches https://github.com/neondatabase/cloud/issues/23283.

## Summary of changes

* Add a `level` label for `pageserver_layer_{count,size}` with values
`l0`, `l1`, and `frozen`.
* Track metrics for frozen layers.

There is already a `kind={delta,image}` label. `kind=image` is only
possible for `level=l1`.

We don't include the currently open ephemeral layer, only frozen layers.
There is always exactly 1 ephemeral layer, with a dynamic size which is
already tracked in `pageserver_timeline_ephemeral_bytes`.
2025-01-29 21:10:56 +00:00
Fedor Dikarev
de1c35fab3 add retries for apt, wget and curl (#10553)
Ref: https://github.com/neondatabase/cloud/issues/23461

## Problem
> recent CI failure due to apt-get:
```
4.266 E: Failed to fetch http://deb.debian.org/debian/pool/main/g/gcc-10/libgfortran5_10.2.1-6_arm64.deb  Error reading from server - read (104: Connection reset by peer) [IP: 146.75.122.132 80]
```

https://github.com/neondatabase/neon/actions/runs/11144974698/job/30973537767?pr=9186
thinking about if there should be a mirror-selector at the beginning of
the dockerfile so that it uses a debian mirror closer to the build
server?
## Summary of changes
We could consider adding local mirror or proxy and keep it close to our
self-hosted runners.
For now lets just add retries for `apt`, `wget` and `curl`

thanks to @skyzh for reporting that in October 2024, I just finally
found time to take a look here :)
2025-01-29 21:02:54 +00:00
Peter Bendel
62819aca36 Add PostgreSQL version 17 benchmarks (#10536)
## Problem

benchmarking.yml so far is only running benchmarks with PostgreSQL
version 16.
However neon recently changed the default for new customers to
PostgreSQL version 17.

See related [epic](https://github.com/neondatabase/cloud/issues/23295)

## Summary of changes

We do not want to run every job step with both pg 16 and 17 because this
would need excessive resources (runners, computes) and extend the
benchmarking run wall clock time too much.

So we select an opinionated subset of testcases that we also report in
weekly reporting and add a postgres v17 job step.

For re-use projects associated Neon projects have been created and
connection strings have been added to neon database organization
secrets.

A follow up is to add the reporting for these new runs to some grafana
dashboards.
2025-01-29 20:21:42 +00:00
14 changed files with 343 additions and 322 deletions

View File

@@ -17,7 +17,7 @@ jobs:
strategy:
fail-fast: false
matrix:
platform: [ aws-rds-postgres, aws-aurora-serverless-v2-postgres, neon ]
platform: [ aws-rds-postgres, aws-aurora-serverless-v2-postgres, neon, neon_pg17 ]
database: [ clickbench, tpch, userexample ]
env:
@@ -41,6 +41,9 @@ jobs:
neon)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CONNSTR }}
;;
neon_pg17)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CONNSTR_PG17 }}
;;
aws-rds-postgres)
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_CONNSTR }}
;;

View File

@@ -63,11 +63,15 @@ jobs:
fail-fast: false
matrix:
include:
- DEFAULT_PG_VERSION: 16
- PG_VERSION: 16
PLATFORM: "neon-staging"
region_id: ${{ github.event.inputs.region_id || 'aws-us-east-2' }}
RUNNER: [ self-hosted, us-east-2, x64 ]
- DEFAULT_PG_VERSION: 16
- PG_VERSION: 17
PLATFORM: "neon-staging"
region_id: ${{ github.event.inputs.region_id || 'aws-us-east-2' }}
RUNNER: [ self-hosted, us-east-2, x64 ]
- PG_VERSION: 16
PLATFORM: "azure-staging"
region_id: 'azure-eastus2'
RUNNER: [ self-hosted, eastus2, x64 ]
@@ -75,7 +79,7 @@ jobs:
TEST_PG_BENCH_DURATIONS_MATRIX: "300"
TEST_PG_BENCH_SCALES_MATRIX: "10,100"
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: ${{ matrix.DEFAULT_PG_VERSION }}
PG_VERSION: ${{ matrix.PG_VERSION }}
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
@@ -112,7 +116,7 @@ jobs:
uses: ./.github/actions/neon-project-create
with:
region_id: ${{ matrix.region_id }}
postgres_version: ${{ env.DEFAULT_PG_VERSION }}
postgres_version: ${{ env.PG_VERSION }}
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Run benchmark
@@ -122,7 +126,7 @@ jobs:
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
pg_version: ${{ env.DEFAULT_PG_VERSION }}
pg_version: ${{ env.PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
# Set --sparse-ordering option of pytest-order plugin
# to ensure tests are running in order of appears in the file.
@@ -313,7 +317,11 @@ jobs:
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-freetier", "db_size": "3gb" ,"runner": '"$runner_azure"', "image": "neondatabase/build-tools:pinned-bookworm" },
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-new", "db_size": "10gb","runner": '"$runner_azure"', "image": "neondatabase/build-tools:pinned-bookworm" },
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-new", "db_size": "50gb","runner": '"$runner_azure"', "image": "neondatabase/build-tools:pinned-bookworm" },
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-sharding-reuse", "db_size": "50gb","runner": '"$runner_default"', "image": "'"$image_default"'" }]
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-sharding-reuse", "db_size": "50gb","runner": '"$runner_default"', "image": "'"$image_default"'" },
{ "pg_version": 17, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-freetier", "db_size": "3gb" ,"runner": '"$runner_default"', "image": "'"$image_default"'" },
{ "pg_version": 17, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-new", "db_size": "10gb","runner": '"$runner_default"', "image": "'"$image_default"'" },
{ "pg_version": 17, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-new-many-tables","db_size": "10gb","runner": '"$runner_default"', "image": "'"$image_default"'" },
{ "pg_version": 17, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-new", "db_size": "50gb","runner": '"$runner_default"', "image": "'"$image_default"'" }]
}'
if [ "$(date +%A)" = "Saturday" ] || [ ${RUN_AWS_RDS_AND_AURORA} = "true" ]; then
@@ -329,12 +337,15 @@ jobs:
matrix='{
"platform": [
"neonvm-captest-reuse"
]
],
"pg_version" : [
16,17
],
}'
if [ "$(date +%A)" = "Saturday" ] || [ ${RUN_AWS_RDS_AND_AURORA} = "true" ]; then
matrix=$(echo "$matrix" | jq '.include += [{ "platform": "rds-postgres" },
{ "platform": "rds-aurora" }]')
matrix=$(echo "$matrix" | jq '.include += [{ "pg_version": 16, "platform": "rds-postgres" },
{ "pg_version": 16, "platform": "rds-aurora" }]')
fi
echo "matrix=$(echo "$matrix" | jq --compact-output '.')" >> $GITHUB_OUTPUT
@@ -346,14 +357,14 @@ jobs:
"platform": [
"neonvm-captest-reuse"
],
"scale": [
"10"
"pg_version" : [
16,17
]
}'
if [ "$(date +%A)" = "Saturday" ] || [ ${RUN_AWS_RDS_AND_AURORA} = "true" ]; then
matrix=$(echo "$matrix" | jq '.include += [{ "platform": "rds-postgres", "scale": "10" },
{ "platform": "rds-aurora", "scale": "10" }]')
matrix=$(echo "$matrix" | jq '.include += [{ "pg_version": 16, "platform": "rds-postgres" },
{ "pg_version": 16, "platform": "rds-aurora" }]')
fi
echo "matrix=$(echo "$matrix" | jq --compact-output '.')" >> $GITHUB_OUTPUT
@@ -378,7 +389,7 @@ jobs:
TEST_PG_BENCH_DURATIONS_MATRIX: "60m"
TEST_PG_BENCH_SCALES_MATRIX: ${{ matrix.db_size }}
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: ${{ matrix.pg_version }}
PG_VERSION: ${{ matrix.pg_version }}
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
@@ -416,7 +427,7 @@ jobs:
uses: ./.github/actions/neon-project-create
with:
region_id: ${{ matrix.region_id }}
postgres_version: ${{ env.DEFAULT_PG_VERSION }}
postgres_version: ${{ env.PG_VERSION }}
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
compute_units: ${{ (contains(matrix.platform, 'captest-freetier') && '[0.25, 0.25]') || '[1, 1]' }}
@@ -459,7 +470,7 @@ jobs:
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_perf_many_relations
pg_version: ${{ env.DEFAULT_PG_VERSION }}
pg_version: ${{ env.PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
@@ -475,7 +486,7 @@ jobs:
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_init
pg_version: ${{ env.DEFAULT_PG_VERSION }}
pg_version: ${{ env.PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
@@ -490,7 +501,7 @@ jobs:
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_simple_update
pg_version: ${{ env.DEFAULT_PG_VERSION }}
pg_version: ${{ env.PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
@@ -505,7 +516,7 @@ jobs:
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_select_only
pg_version: ${{ env.DEFAULT_PG_VERSION }}
pg_version: ${{ env.PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
@@ -549,14 +560,19 @@ jobs:
include:
- PLATFORM: "neonvm-captest-pgvector"
RUNNER: [ self-hosted, us-east-2, x64 ]
postgres_version: 16
- PLATFORM: "neonvm-captest-pgvector-pg17"
RUNNER: [ self-hosted, us-east-2, x64 ]
postgres_version: 17
- PLATFORM: "azure-captest-pgvector"
RUNNER: [ self-hosted, eastus2, x64 ]
postgres_version: 16
env:
TEST_PG_BENCH_DURATIONS_MATRIX: "15m"
TEST_PG_BENCH_SCALES_MATRIX: "1"
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 16
PG_VERSION: ${{ matrix.postgres_version }}
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
@@ -590,9 +606,13 @@ jobs:
dpkg -x postgresql-client-16_16.6-1.pgdg120+1_${arch}.deb pg
mkdir -p /tmp/neon/pg_install/v16/bin
mkdir -p /tmp/neon/pg_install/v17/bin
ln -s /home/nonroot/pg/usr/lib/postgresql/16/bin/pgbench /tmp/neon/pg_install/v16/bin/pgbench
ln -s /home/nonroot/pg/usr/lib/postgresql/16/bin/psql /tmp/neon/pg_install/v16/bin/psql
ln -s /home/nonroot/pg/usr/lib/$(uname -m)-linux-gnu /tmp/neon/pg_install/v16/lib
ln -s /home/nonroot/pg/usr/lib/postgresql/16/bin/pgbench /tmp/neon/pg_install/v17/bin/pgbench
ln -s /home/nonroot/pg/usr/lib/postgresql/16/bin/psql /tmp/neon/pg_install/v17/bin/psql
ln -s /home/nonroot/pg/usr/lib/$(uname -m)-linux-gnu /tmp/neon/pg_install/v17/lib
LD_LIBRARY_PATH="/home/nonroot/pg/usr/lib/$(uname -m)-linux-gnu:${LD_LIBRARY_PATH:-}"
export LD_LIBRARY_PATH
@@ -608,6 +628,9 @@ jobs:
neonvm-captest-pgvector)
CONNSTR=${{ secrets.BENCHMARK_PGVECTOR_CONNSTR }}
;;
neonvm-captest-pgvector-pg17)
CONNSTR=${{ secrets.BENCHMARK_PGVECTOR_CONNSTR_PG17 }}
;;
azure-captest-pgvector)
CONNSTR=${{ secrets.BENCHMARK_PGVECTOR_CONNSTR_AZURE }}
;;
@@ -634,7 +657,7 @@ jobs:
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgvector_indexing
pg_version: ${{ env.DEFAULT_PG_VERSION }}
pg_version: ${{ env.PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -649,7 +672,7 @@ jobs:
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600
pg_version: ${{ env.DEFAULT_PG_VERSION }}
pg_version: ${{ env.PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
@@ -696,7 +719,7 @@ jobs:
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 16
PG_VERSION: ${{ matrix.pg_version }}
TEST_OUTPUT: /tmp/test_output
TEST_OLAP_COLLECT_EXPLAIN: ${{ github.event.inputs.collect_olap_explain }}
TEST_OLAP_COLLECT_PG_STAT_STATEMENTS: ${{ github.event.inputs.collect_pg_stat_statements }}
@@ -739,7 +762,18 @@ jobs:
run: |
case "${PLATFORM}" in
neonvm-captest-reuse)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CLICKBENCH_10M_CONNSTR }}
case "${PG_VERSION}" in
16)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CLICKBENCH_10M_CONNSTR_V16 }}
;;
17)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CONNSTR_PG17 }}
;;
*)
echo >&2 "Unsupported PG_VERSION=${PG_VERSION} for PLATFORM=${PLATFORM}"
exit 1
;;
esac
;;
rds-aurora)
CONNSTR=${{ secrets.BENCHMARK_RDS_AURORA_CLICKBENCH_10M_CONNSTR }}
@@ -763,7 +797,7 @@ jobs:
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 43200 -k test_clickbench
pg_version: ${{ env.DEFAULT_PG_VERSION }}
pg_version: ${{ env.PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -812,12 +846,11 @@ jobs:
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 16
PG_VERSION: ${{ matrix.pg_version }}
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: ${{ matrix.platform }}
TEST_OLAP_SCALE: ${{ matrix.scale }}
runs-on: [ self-hosted, us-east-2, x64 ]
container:
@@ -849,21 +882,31 @@ jobs:
run: |
case "${PLATFORM}" in
neonvm-captest-reuse)
ENV_PLATFORM=CAPTEST_TPCH
case "${PG_VERSION}" in
16)
CONNSTR_SECRET_NAME="BENCHMARK_CAPTEST_TPCH_S10_CONNSTR"
;;
17)
CONNSTR_SECRET_NAME="BENCHMARK_CAPTEST_CONNSTR_PG17"
;;
*)
echo >&2 "Unsupported PG_VERSION=${PG_VERSION} for PLATFORM=${PLATFORM}"
exit 1
;;
esac
;;
rds-aurora)
ENV_PLATFORM=RDS_AURORA_TPCH
CONNSTR_SECRET_NAME="BENCHMARK_RDS_AURORA_TPCH_S10_CONNSTR"
;;
rds-postgres)
ENV_PLATFORM=RDS_POSTGRES_TPCH
CONNSTR_SECRET_NAME="BENCHMARK_RDS_POSTGRES_TPCH_S10_CONNSTR"
;;
*)
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neonvm-captest-reuse', 'rds-aurora', or 'rds-postgres'"
exit 1
;;
esac
CONNSTR_SECRET_NAME="BENCHMARK_${ENV_PLATFORM}_S${TEST_OLAP_SCALE}_CONNSTR"
echo "CONNSTR_SECRET_NAME=${CONNSTR_SECRET_NAME}" >> $GITHUB_ENV
- name: Set up Connection String
@@ -881,13 +924,13 @@ jobs:
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_tpch
pg_version: ${{ env.DEFAULT_PG_VERSION }}
pg_version: ${{ env.PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
TEST_OLAP_SCALE: ${{ matrix.scale }}
TEST_OLAP_SCALE: 10
- name: Create Allure report
id: create-allure-report
@@ -922,7 +965,7 @@ jobs:
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 16
PG_VERSION: ${{ matrix.pg_version }}
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
@@ -959,7 +1002,18 @@ jobs:
run: |
case "${PLATFORM}" in
neonvm-captest-reuse)
CONNSTR=${{ secrets.BENCHMARK_USER_EXAMPLE_CAPTEST_CONNSTR }}
case "${PG_VERSION}" in
16)
CONNSTR=${{ secrets.BENCHMARK_USER_EXAMPLE_CAPTEST_CONNSTR }}
;;
17)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CONNSTR_PG17 }}
;;
*)
echo >&2 "Unsupported PG_VERSION=${PG_VERSION} for PLATFORM=${PLATFORM}"
exit 1
;;
esac
;;
rds-aurora)
CONNSTR=${{ secrets.BENCHMARK_USER_EXAMPLE_RDS_AURORA_CONNSTR }}
@@ -983,7 +1037,7 @@ jobs:
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_user_examples
pg_version: ${{ env.DEFAULT_PG_VERSION }}
pg_version: ${{ env.PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"

View File

@@ -64,6 +64,7 @@ ARG DEFAULT_PG_VERSION
WORKDIR /data
RUN set -e \
&& echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries \
&& apt update \
&& apt install -y \
libreadline-dev \
@@ -72,6 +73,7 @@ RUN set -e \
# System postgres for use with client libraries (e.g. in storage controller)
postgresql-15 \
openssl \
&& rm -f /etc/apt/apt.conf.d/80-retries \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \
&& useradd -d /data neon \
&& chown -R neon:neon /data

View File

@@ -3,6 +3,10 @@ ARG DEBIAN_VERSION=bookworm
FROM debian:bookworm-slim AS pgcopydb_builder
ARG DEBIAN_VERSION
RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc \
echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc
RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
set -e && \
apt update && \
@@ -61,6 +65,10 @@ RUN mkdir -p /pgcopydb/bin && \
COPY --from=pgcopydb_builder /usr/lib/postgresql/16/bin/pgcopydb /pgcopydb/bin/pgcopydb
COPY --from=pgcopydb_builder /pgcopydb/lib/libpq.so.5 /pgcopydb/lib/libpq.so.5
RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc \
echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc
# System deps
#
# 'gdb' is included so that we get backtraces of core dumps produced in
@@ -218,6 +226,8 @@ RUN wget -O /tmp/libicu-${ICU_VERSION}.tgz https://github.com/unicode-org/icu/re
USER nonroot:nonroot
WORKDIR /home/nonroot
RUN echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /home/nonroot/.curlrc
# Python
ENV PYTHON_VERSION=3.11.10 \
PYENV_ROOT=/home/nonroot/.pyenv \

View File

@@ -18,6 +18,10 @@ ARG DEBIAN_VERSION
# Use strict mode for bash to catch errors early
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc \
echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc
RUN case $DEBIAN_VERSION in \
# Version-specific installs for Bullseye (PG14-PG16):
# The h3_pg extension needs a cmake 3.20+, but Debian bullseye has 3.18.
@@ -838,6 +842,8 @@ ENV PATH="/home/nonroot/.cargo/bin:$PATH"
USER nonroot
WORKDIR /home/nonroot
RUN echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /home/nonroot/.curlrc
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && \
chmod +x rustup-init && \
./rustup-init -y --no-modify-path --profile minimal --default-toolchain stable && \
@@ -874,6 +880,8 @@ ENV PATH="/home/nonroot/.cargo/bin:$PATH"
USER nonroot
WORKDIR /home/nonroot
RUN echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /home/nonroot/.curlrc
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && \
chmod +x rustup-init && \
./rustup-init -y --no-modify-path --profile minimal --default-toolchain stable && \
@@ -1243,6 +1251,7 @@ RUN mold -run cargo build --locked --profile release-line-debug-size-lto --bin c
FROM debian:$DEBIAN_FLAVOR AS pgbouncer
RUN set -e \
&& echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries \
&& apt update \
&& apt install --no-install-suggests --no-install-recommends -y \
build-essential \
@@ -1444,6 +1453,8 @@ RUN mkdir /usr/local/download_extensions && chown -R postgres:postgres /usr/loca
# libboost* for rdkit
# ca-certificates for communicating with s3 by compute_ctl
RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc
RUN apt update && \
case $DEBIAN_VERSION in \
@@ -1500,7 +1511,7 @@ RUN set -ex; \
else \
echo "Unsupported architecture: ${TARGETARCH}"; exit 1; \
fi; \
curl -L "https://awscli.amazonaws.com/awscli-exe-linux-${TARGETARCH_ALT}-2.17.5.zip" -o /tmp/awscliv2.zip; \
curl --retry 5 -L "https://awscli.amazonaws.com/awscli-exe-linux-${TARGETARCH_ALT}-2.17.5.zip" -o /tmp/awscliv2.zip; \
echo "${CHECKSUM} /tmp/awscliv2.zip" | sha256sum -c -; \
unzip /tmp/awscliv2.zip -d /tmp/awscliv2; \
/tmp/awscliv2/aws/install; \

View File

@@ -7,11 +7,12 @@ FROM $REPOSITORY/${COMPUTE_IMAGE}:$TAG
ARG COMPUTE_IMAGE
USER root
RUN apt-get update && \
RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
apt-get update && \
apt-get install -y curl \
jq \
netcat-openbsd
#This is required for the pg_hintplan test
RUN mkdir -p /ext-src/pg_hint_plan-src && chown postgres /ext-src/pg_hint_plan-src
RUN mkdir -p /ext-src/pg_hint_plan-src && chown postgres /ext-src/pg_hint_plan-src
USER postgres
USER postgres

View File

@@ -1,4 +1,13 @@
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use enum_map::EnumMap;
use futures::Future;
use metrics::{
register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec,
register_int_counter, register_int_counter_pair_vec, register_int_counter_vec,
@@ -11,13 +20,26 @@ use pageserver_api::config::{
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
PageServiceProtocolPipelinedExecutionStrategy,
};
use pageserver_api::models::InMemoryLayerInfo;
use pageserver_api::shard::TenantShardId;
use pin_project_lite::pin_project;
use postgres_backend::{is_expected_io_error, QueryError};
use pq_proto::framed::ConnectionError;
use strum::{EnumCount, VariantNames};
use strum::{EnumCount, IntoEnumIterator as _, VariantNames};
use strum_macros::{IntoStaticStr, VariantNames};
use utils::id::TimelineId;
use crate::config::PageServerConf;
use crate::context::{PageContentKind, RequestContext};
use crate::task_mgr::TaskKind;
use crate::tenant::layer_map::LayerMap;
use crate::tenant::mgr::TenantSlot;
use crate::tenant::storage_layer::{InMemoryLayer, PersistentLayerDesc};
use crate::tenant::tasks::BackgroundLoopKind;
use crate::tenant::throttle::ThrottleResult;
use crate::tenant::Timeline;
/// Prometheus histogram buckets (in seconds) for operations in the critical
/// path. In other words, operations that directly affect that latency of user
/// queries.
@@ -443,18 +465,38 @@ static PITR_HISTORY_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
#[derive(strum_macros::EnumString, strum_macros::Display, strum_macros::IntoStaticStr)]
#[derive(
strum_macros::EnumIter,
strum_macros::EnumString,
strum_macros::Display,
strum_macros::IntoStaticStr,
)]
#[strum(serialize_all = "kebab_case")]
pub(crate) enum MetricLayerKind {
pub(crate) enum LayerKind {
Delta,
Image,
}
#[derive(
strum_macros::EnumIter,
strum_macros::EnumString,
strum_macros::Display,
strum_macros::IntoStaticStr,
)]
#[strum(serialize_all = "kebab_case")]
pub(crate) enum LayerLevel {
// We don't track the currently open ephemeral layer, since there's always exactly 1 and its
// size changes. See `TIMELINE_EPHEMERAL_BYTES`.
Frozen,
L0,
L1,
}
static TIMELINE_LAYER_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_layer_bytes",
"Sum of layer physical sizes in bytes",
&["tenant_id", "shard_id", "timeline_id", "kind"]
"Sum of frozen, L0, and L1 layer physical sizes in bytes (excluding the open ephemeral layer)",
&["tenant_id", "shard_id", "timeline_id", "level", "kind"]
)
.expect("failed to define a metric")
});
@@ -462,8 +504,8 @@ static TIMELINE_LAYER_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
static TIMELINE_LAYER_COUNT: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_layer_count",
"Number of layers that exist",
&["tenant_id", "shard_id", "timeline_id", "kind"]
"Number of frozen, L0, and L1 layers (excluding the open ephemeral layer)",
&["tenant_id", "shard_id", "timeline_id", "level", "kind"]
)
.expect("failed to define a metric")
});
@@ -2590,10 +2632,6 @@ pub(crate) struct TimelineMetrics {
pub disk_consistent_lsn_gauge: IntGauge,
pub pitr_history_size: UIntGauge,
pub archival_size: UIntGauge,
pub(crate) layer_size_image: UIntGauge,
pub(crate) layer_count_image: UIntGauge,
pub(crate) layer_size_delta: UIntGauge,
pub(crate) layer_count_delta: UIntGauge,
pub standby_horizon_gauge: IntGauge,
pub resident_physical_size_gauge: UIntGauge,
pub visible_physical_size_gauge: UIntGauge,
@@ -2691,42 +2729,6 @@ impl TimelineMetrics {
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let layer_size_image = TIMELINE_LAYER_SIZE
.get_metric_with_label_values(&[
&tenant_id,
&shard_id,
&timeline_id,
MetricLayerKind::Image.into(),
])
.unwrap();
let layer_count_image = TIMELINE_LAYER_COUNT
.get_metric_with_label_values(&[
&tenant_id,
&shard_id,
&timeline_id,
MetricLayerKind::Image.into(),
])
.unwrap();
let layer_size_delta = TIMELINE_LAYER_SIZE
.get_metric_with_label_values(&[
&tenant_id,
&shard_id,
&timeline_id,
MetricLayerKind::Delta.into(),
])
.unwrap();
let layer_count_delta = TIMELINE_LAYER_COUNT
.get_metric_with_label_values(&[
&tenant_id,
&shard_id,
&timeline_id,
MetricLayerKind::Delta.into(),
])
.unwrap();
let standby_horizon_gauge = STANDBY_HORIZON
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
@@ -2791,10 +2793,6 @@ impl TimelineMetrics {
disk_consistent_lsn_gauge,
pitr_history_size,
archival_size,
layer_size_image,
layer_count_image,
layer_size_delta,
layer_count_delta,
standby_horizon_gauge,
resident_physical_size_gauge,
visible_physical_size_gauge,
@@ -2837,6 +2835,92 @@ impl TimelineMetrics {
.add(duration);
}
/// Generates TIMELINE_LAYER labels for a persistent layer.
fn make_layer_labels(&self, layer_desc: &PersistentLayerDesc) -> [&str; 5] {
let level = match LayerMap::is_l0(&layer_desc.key_range, layer_desc.is_delta()) {
true => LayerLevel::L0,
false => LayerLevel::L1,
};
let kind = match layer_desc.is_delta() {
true => LayerKind::Delta,
false => LayerKind::Image,
};
[
&self.tenant_id,
&self.shard_id,
&self.timeline_id,
level.into(),
kind.into(),
]
}
/// Generates TIMELINE_LAYER labels for a frozen ephemeral layer.
fn make_frozen_layer_labels(&self, _layer: &InMemoryLayer) -> [&str; 5] {
[
&self.tenant_id,
&self.shard_id,
&self.timeline_id,
LayerLevel::Frozen.into(),
LayerKind::Delta.into(), // by definition
]
}
/// Removes a frozen ephemeral layer to TIMELINE_LAYER metrics.
pub fn dec_frozen_layer(&self, layer: &InMemoryLayer) {
assert!(matches!(layer.info(), InMemoryLayerInfo::Frozen { .. }));
let labels = self.make_frozen_layer_labels(layer);
let size = layer.try_len().expect("frozen layer should have no writer");
TIMELINE_LAYER_COUNT
.get_metric_with_label_values(&labels)
.unwrap()
.dec();
TIMELINE_LAYER_SIZE
.get_metric_with_label_values(&labels)
.unwrap()
.sub(size);
}
/// Adds a frozen ephemeral layer to TIMELINE_LAYER metrics.
pub fn inc_frozen_layer(&self, layer: &InMemoryLayer) {
assert!(matches!(layer.info(), InMemoryLayerInfo::Frozen { .. }));
let labels = self.make_frozen_layer_labels(layer);
let size = layer.try_len().expect("frozen layer should have no writer");
TIMELINE_LAYER_COUNT
.get_metric_with_label_values(&labels)
.unwrap()
.inc();
TIMELINE_LAYER_SIZE
.get_metric_with_label_values(&labels)
.unwrap()
.add(size);
}
/// Removes a persistent layer from TIMELINE_LAYER metrics.
pub fn dec_layer(&self, layer_desc: &PersistentLayerDesc) {
let labels = self.make_layer_labels(layer_desc);
TIMELINE_LAYER_COUNT
.get_metric_with_label_values(&labels)
.unwrap()
.dec();
TIMELINE_LAYER_SIZE
.get_metric_with_label_values(&labels)
.unwrap()
.sub(layer_desc.file_size);
}
/// Adds a persistent layer to TIMELINE_LAYER metrics.
pub fn inc_layer(&self, layer_desc: &PersistentLayerDesc) {
let labels = self.make_layer_labels(layer_desc);
TIMELINE_LAYER_COUNT
.get_metric_with_label_values(&labels)
.unwrap()
.inc();
TIMELINE_LAYER_SIZE
.get_metric_with_label_values(&labels)
.unwrap()
.add(layer_desc.file_size);
}
pub(crate) fn shutdown(&self) {
let was_shutdown = self
.shutdown
@@ -2869,30 +2953,14 @@ impl TimelineMetrics {
let _ = TIMELINE_ARCHIVE_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = PITR_HISTORY_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = TIMELINE_LAYER_SIZE.remove_label_values(&[
tenant_id,
shard_id,
timeline_id,
MetricLayerKind::Image.into(),
]);
let _ = TIMELINE_LAYER_COUNT.remove_label_values(&[
tenant_id,
shard_id,
timeline_id,
MetricLayerKind::Image.into(),
]);
let _ = TIMELINE_LAYER_SIZE.remove_label_values(&[
tenant_id,
shard_id,
timeline_id,
MetricLayerKind::Delta.into(),
]);
let _ = TIMELINE_LAYER_COUNT.remove_label_values(&[
tenant_id,
shard_id,
timeline_id,
MetricLayerKind::Delta.into(),
]);
for ref level in LayerLevel::iter() {
for ref kind in LayerKind::iter() {
let labels: [&str; 5] =
[tenant_id, shard_id, timeline_id, level.into(), kind.into()];
let _ = TIMELINE_LAYER_SIZE.remove_label_values(&labels);
let _ = TIMELINE_LAYER_COUNT.remove_label_values(&labels);
}
}
let _ = EVICTIONS.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = AUX_FILE_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
@@ -2974,24 +3042,6 @@ pub(crate) fn remove_tenant_metrics(tenant_shard_id: &TenantShardId) {
// we leave the BROKEN_TENANTS_SET entry if any
}
use futures::Future;
use pin_project_lite::pin_project;
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use crate::config::PageServerConf;
use crate::context::{PageContentKind, RequestContext};
use crate::task_mgr::TaskKind;
use crate::tenant::mgr::TenantSlot;
use crate::tenant::tasks::BackgroundLoopKind;
use crate::tenant::throttle::ThrottleResult;
use crate::tenant::Timeline;
/// Maintain a per timeline gauge in addition to the global gauge.
pub(crate) struct PerTimelineRemotePhysicalSizeGauge {
last_set: AtomicU64,

View File

@@ -701,13 +701,7 @@ impl Drop for LayerInner {
if let Some(timeline) = timeline.as_ref() {
// Only need to decrement metrics if the timeline still exists: otherwise
// it will have already de-registered these metrics via TimelineMetrics::shutdown
if self.desc.is_delta() {
timeline.metrics.layer_count_delta.dec();
timeline.metrics.layer_size_delta.sub(self.desc.file_size);
} else {
timeline.metrics.layer_count_image.dec();
timeline.metrics.layer_size_image.sub(self.desc.file_size);
}
timeline.metrics.dec_layer(&self.desc);
if matches!(self.access_stats.visibility(), LayerVisibilityHint::Visible) {
debug_assert!(
@@ -817,13 +811,7 @@ impl LayerInner {
};
// This object acts as a RAII guard on these metrics: increment on construction
if desc.is_delta() {
timeline.metrics.layer_count_delta.inc();
timeline.metrics.layer_size_delta.add(desc.file_size);
} else {
timeline.metrics.layer_count_image.inc();
timeline.metrics.layer_size_image.add(desc.file_size);
}
timeline.metrics.inc_layer(&desc);
// New layers are visible by default. This metric is later updated on drop or in set_visibility
timeline

View File

@@ -3703,7 +3703,7 @@ impl Timeline {
let mut guard = self.layers.write().await;
guard
.open_mut()?
.try_freeze_in_memory_layer(at, &self.last_freeze_at, write_lock)
.try_freeze_in_memory_layer(at, &self.last_freeze_at, write_lock, &self.metrics)
.await
};

View File

@@ -624,7 +624,13 @@ impl Timeline {
// High level strategy for compaction / image creation:
//
// 1. First, calculate the desired "partitioning" of the
// 1. First, do a L0 compaction to ensure we move the L0
// layers into the historic layer map get flat levels of
// layers. If we did not compact all L0 layers, we will
// prioritize compacting the timeline again and not do
// any of the compactions below.
//
// 2. Then, calculate the desired "partitioning" of the
// currently in-use key space. The goal is to partition the
// key space into roughly fixed-size chunks, but also take into
// account any existing image layers, and try to align the
@@ -638,7 +644,7 @@ impl Timeline {
// identify a relation. This is just an optimization,
// though.
//
// 2. Once we know the partitioning, for each partition,
// 3. Once we know the partitioning, for each partition,
// decide if it's time to create a new image layer. The
// criteria is: there has been too much "churn" since the last
// image layer? The "churn" is fuzzy concept, it's a
@@ -646,15 +652,8 @@ impl Timeline {
// total in the delta file. Or perhaps: if creating an image
// file would allow to delete some older files.
//
// 3. After that, we compact all level0 delta files if there
// are too many of them. While compacting, we also garbage
// collect any page versions that are no longer needed because
// of the new image layers we created in step 2.
//
// TODO: This high level strategy hasn't been implemented yet.
// Below are functions compact_level0() and create_image_layers()
// but they are a bit ad hoc and don't quite work like it's explained
// above. Rewrite it.
// 4. In the end, if the tenant gets auto-sharded, we will run
// a shard-ancestor compaction.
// Is the timeline being deleted?
if self.is_stopping() {
@@ -666,10 +665,32 @@ impl Timeline {
// Define partitioning schema if needed
// FIXME: the match should only cover repartitioning, not the next steps
let (partition_count, has_pending_tasks) = match self
// 1. L0 Compact
let fully_compacted = {
let timer = self.metrics.compact_time_histo.start_timer();
let fully_compacted = self
.compact_level0(
target_file_size,
options.flags.contains(CompactFlags::ForceL0Compaction),
ctx,
)
.await?;
timer.stop_and_record();
fully_compacted
};
if !fully_compacted {
// Yield and do not do any other kind of compaction. True means
// that we have pending L0 compaction tasks and the compaction scheduler
// will prioritize compacting this tenant/timeline again.
info!("skipping image layer generation and shard ancestor compaction due to L0 compaction did not include all layers.");
return Ok(true);
}
// 2. Repartition and create image layers if necessary
let partition_count = match self
.repartition(
self.get_last_record_lsn(),
self.get_last_record_lsn(), // TODO: use L0-L1 boundary
self.get_compaction_target_size(),
options.flags,
ctx,
@@ -682,46 +703,30 @@ impl Timeline {
.access_stats_behavior(AccessStatsBehavior::Skip)
.build();
// 2. Compact
let timer = self.metrics.compact_time_histo.start_timer();
let fully_compacted = self
.compact_level0(
target_file_size,
options.flags.contains(CompactFlags::ForceL0Compaction),
ctx,
)
.await?;
timer.stop_and_record();
let mut partitioning = dense_partitioning;
partitioning
.parts
.extend(sparse_partitioning.into_dense().parts);
// 3. Create new image layers for partitions that have been modified
// "enough". Skip image layer creation if L0 compaction cannot keep up.
if fully_compacted {
let image_layers = self
.create_image_layers(
&partitioning,
lsn,
if options
.flags
.contains(CompactFlags::ForceImageLayerCreation)
{
ImageLayerCreationMode::Force
} else {
ImageLayerCreationMode::Try
},
&image_ctx,
)
.await?;
// 3. Create new image layers for partitions that have been modified "enough".
let image_layers = self
.create_image_layers(
&partitioning,
lsn,
if options
.flags
.contains(CompactFlags::ForceImageLayerCreation)
{
ImageLayerCreationMode::Force
} else {
ImageLayerCreationMode::Try
},
&image_ctx,
)
.await?;
self.upload_new_image_layers(image_layers)?;
} else {
info!("skipping image layer generation due to L0 compaction did not include all layers.");
}
(partitioning.parts.len(), !fully_compacted)
self.upload_new_image_layers(image_layers)?;
partitioning.parts.len()
}
Err(err) => {
// no partitioning? This is normal, if the timeline was just created
@@ -733,10 +738,12 @@ impl Timeline {
if !self.cancel.is_cancelled() && !err.is_cancelled() {
tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
}
(1, false)
1
}
};
// 4. Shard ancestor compaction
if self.shard_identity.count >= ShardCount::new(2) {
// Limit the number of layer rewrites to the number of partitions: this means its
// runtime should be comparable to a full round of image layer creations, rather than
@@ -746,7 +753,7 @@ impl Timeline {
self.compact_shard_ancestors(rewrite_max, ctx).await?;
}
Ok(has_pending_tasks)
Ok(false)
}
/// Check for layers that are elegible to be rewritten:

View File

@@ -91,6 +91,7 @@ impl LayerManager {
layer_map,
layer_fmgr: LayerFileManager(hashmap),
}) => {
// NB: no need to decrement layer metrics; metrics are removed on timeline shutdown.
let open = layer_map.open_layer.take();
let frozen = layer_map.frozen_layers.len();
let taken_writer_state = writer_state.take();
@@ -234,6 +235,7 @@ impl OpenLayerManager {
lsn: Lsn,
last_freeze_at: &AtomicLsn,
write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
metrics: &TimelineMetrics,
) -> bool {
let Lsn(last_record_lsn) = lsn;
let end_lsn = Lsn(last_record_lsn + 1);
@@ -242,6 +244,11 @@ impl OpenLayerManager {
let open_layer_rc = Arc::clone(open_layer);
open_layer.freeze(end_lsn).await;
// Increment the frozen layer metrics. This is decremented in `finish_flush_l0_layer()`.
// TODO: It would be nicer to do this via `InMemoryLayer::drop()`, but it requires a
// reference to the timeline metrics. Other methods use a metrics borrow as well.
metrics.inc_frozen_layer(open_layer);
// The layer is no longer open, update the layer map to reflect this.
// We will replace it with on-disk historics below.
self.layer_map.frozen_layers.push_back(open_layer_rc);
@@ -298,6 +305,7 @@ impl OpenLayerManager {
.frozen_layers
.pop_front()
.expect("there must be a inmem layer to flush");
metrics.dec_frozen_layer(&inmem);
// Only one task may call this function at a time (for this
// timeline). If two tasks tried to flush the same frozen

View File

@@ -505,13 +505,6 @@ async fn main() -> anyhow::Result<()> {
}
}
if let Some(mut redis_kv_client) = redis_kv_client {
maintenance_tasks.spawn(async move {
redis_kv_client.try_connect().await?;
handle_cancel_messages(&mut redis_kv_client, rx_cancel).await
});
}
if let Some(regional_redis_client) = regional_redis_client {
let cache = api.caches.endpoints_cache.clone();
let con = regional_redis_client;
@@ -524,6 +517,15 @@ async fn main() -> anyhow::Result<()> {
}
}
if let Some(mut redis_kv_client) = redis_kv_client {
maintenance_tasks.spawn(async move {
if let Err(err) = redis_kv_client.try_connect().await {
tracing::error!(?err, "could not connect to redis")
}
handle_cancel_messages(&mut redis_kv_client, rx_cancel).await
});
}
let maintenance = loop {
// get one complete task
match futures::future::select(

View File

@@ -1,114 +0,0 @@
use core::net::IpAddr;
use std::sync::Arc;
use pq_proto::CancelKeyData;
use tokio::sync::Mutex;
use uuid::Uuid;
use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
use crate::rate_limiter::{GlobalRateLimiter, RateBucketInfo};
pub trait CancellationPublisherMut: Send + Sync + 'static {
#[allow(async_fn_in_trait)]
async fn try_publish(
&mut self,
cancel_key_data: CancelKeyData,
session_id: Uuid,
peer_addr: IpAddr,
) -> anyhow::Result<()>;
}
pub trait CancellationPublisher: Send + Sync + 'static {
#[allow(async_fn_in_trait)]
async fn try_publish(
&self,
cancel_key_data: CancelKeyData,
session_id: Uuid,
peer_addr: IpAddr,
) -> anyhow::Result<()>;
}
impl CancellationPublisher for () {
async fn try_publish(
&self,
_cancel_key_data: CancelKeyData,
_session_id: Uuid,
_peer_addr: IpAddr,
) -> anyhow::Result<()> {
Ok(())
}
}
impl<P: CancellationPublisher> CancellationPublisherMut for P {
async fn try_publish(
&mut self,
cancel_key_data: CancelKeyData,
session_id: Uuid,
peer_addr: IpAddr,
) -> anyhow::Result<()> {
<P as CancellationPublisher>::try_publish(self, cancel_key_data, session_id, peer_addr)
.await
}
}
impl<P: CancellationPublisher> CancellationPublisher for Option<P> {
async fn try_publish(
&self,
cancel_key_data: CancelKeyData,
session_id: Uuid,
peer_addr: IpAddr,
) -> anyhow::Result<()> {
if let Some(p) = self {
p.try_publish(cancel_key_data, session_id, peer_addr).await
} else {
Ok(())
}
}
}
impl<P: CancellationPublisherMut> CancellationPublisher for Arc<Mutex<P>> {
async fn try_publish(
&self,
cancel_key_data: CancelKeyData,
session_id: Uuid,
peer_addr: IpAddr,
) -> anyhow::Result<()> {
self.lock()
.await
.try_publish(cancel_key_data, session_id, peer_addr)
.await
}
}
pub struct RedisPublisherClient {
#[allow(dead_code)]
client: ConnectionWithCredentialsProvider,
_region_id: String,
_limiter: GlobalRateLimiter,
}
impl RedisPublisherClient {
pub fn new(
client: ConnectionWithCredentialsProvider,
region_id: String,
info: &'static [RateBucketInfo],
) -> anyhow::Result<Self> {
Ok(Self {
client,
_region_id: region_id,
_limiter: GlobalRateLimiter::new(info.into()),
})
}
#[allow(dead_code)]
pub(crate) async fn try_connect(&mut self) -> anyhow::Result<()> {
match self.client.connect().await {
Ok(()) => {}
Err(e) => {
tracing::error!("failed to connect to redis: {e}");
return Err(e);
}
}
Ok(())
}
}

View File

@@ -1,4 +1,3 @@
pub mod cancellation_publisher;
pub mod connection_with_credentials_provider;
pub mod elasticache;
pub mod keys;