Compare commits

..

1 Commits

Author SHA1 Message Date
Konstantin Knizhnik
54df1c5411 Make it possible to reenable LFC 2025-02-05 21:26:41 +02:00
69 changed files with 1009 additions and 1641 deletions

View File

@@ -24,4 +24,3 @@
!storage_controller/
!vendor/postgres-*/
!workspace_hack/
!build_tools/patches

View File

@@ -121,8 +121,6 @@ runs:
export DEFAULT_PG_VERSION=${PG_VERSION#v}
export LD_LIBRARY_PATH=${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/lib
export BENCHMARK_CONNSTR=${BENCHMARK_CONNSTR:-}
export ASAN_OPTIONS=detect_leaks=0:detect_stack_use_after_return=0:abort_on_error=1:strict_string_checks=1:check_initialization_order=1:strict_init_order=1
export UBSAN_OPTIONS=abort_on_error=1:print_stacktrace=1
if [ "${BUILD_TYPE}" = "remote" ]; then
export REMOTE_ENV=1

View File

@@ -20,7 +20,7 @@ on:
required: true
type: string
test-cfg:
description: 'a json object of postgres versions and lfc/sanitizers states to build and run regression tests on'
description: 'a json object of postgres versions and lfc states to run regression tests on'
required: true
type: string
@@ -48,8 +48,6 @@ jobs:
# io_uring will account the memory of the CQ and SQ as locked.
# More details: https://github.com/neondatabase/neon/issues/6373#issuecomment-1905814391
options: --init --shm-size=512mb --ulimit memlock=67108864:67108864
strategy:
matrix: ${{ fromJSON(format('{{"include":{0}}}', inputs.test-cfg)) }}
env:
BUILD_TYPE: ${{ inputs.build-type }}
GIT_VERSION: ${{ github.event.pull_request.head.sha || github.sha }}
@@ -89,7 +87,6 @@ jobs:
- name: Set env variables
env:
ARCH: ${{ inputs.arch }}
SANITIZERS: ${{ matrix.sanitizers }}
run: |
CARGO_FEATURES="--features testing"
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
@@ -102,14 +99,8 @@ jobs:
cov_prefix=""
CARGO_FLAGS="--locked --release"
fi
if [[ $SANITIZERS == 'enabled' ]]; then
make_vars="WITH_SANITIZERS=yes"
else
make_vars=""
fi
{
echo "cov_prefix=${cov_prefix}"
echo "make_vars=${make_vars}"
echo "CARGO_FEATURES=${CARGO_FEATURES}"
echo "CARGO_FLAGS=${CARGO_FLAGS}"
echo "CARGO_HOME=${GITHUB_WORKSPACE}/.cargo"
@@ -145,39 +136,35 @@ jobs:
- name: Build postgres v14
if: steps.cache_pg_14.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v14 -j$(nproc)
run: mold -run make postgres-v14 -j$(nproc)
- name: Build postgres v15
if: steps.cache_pg_15.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v15 -j$(nproc)
run: mold -run make postgres-v15 -j$(nproc)
- name: Build postgres v16
if: steps.cache_pg_16.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v16 -j$(nproc)
run: mold -run make postgres-v16 -j$(nproc)
- name: Build postgres v17
if: steps.cache_pg_17.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v17 -j$(nproc)
run: mold -run make postgres-v17 -j$(nproc)
- name: Build neon extensions
run: mold -run make ${make_vars} neon-pg-ext -j$(nproc)
run: mold -run make neon-pg-ext -j$(nproc)
- name: Build walproposer-lib
run: mold -run make ${make_vars} walproposer-lib -j$(nproc)
run: mold -run make walproposer-lib -j$(nproc)
- name: Run cargo build
env:
WITH_TESTS: ${{ matrix.sanitizers != 'enabled' && '--tests' || '' }}
run: |
export ASAN_OPTIONS=detect_leaks=0
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins ${WITH_TESTS}
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests
# Do install *before* running rust tests because they might recompile the
# binaries with different features/flags.
- name: Install rust binaries
env:
ARCH: ${{ inputs.arch }}
SANITIZERS: ${{ matrix.sanitizers }}
run: |
# Install target binaries
mkdir -p /tmp/neon/bin/
@@ -192,7 +179,7 @@ jobs:
done
# Install test executables and write list of all binaries (for code coverage)
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' && $SANITIZERS != 'enabled' ]]; then
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
# Keep bloated coverage data files away from the rest of the artifact
mkdir -p /tmp/coverage/
@@ -225,7 +212,6 @@ jobs:
role-duration-seconds: 18000 # 5 hours
- name: Run rust tests
if: ${{ matrix.sanitizers != 'enabled' }}
env:
NEXTEST_RETRIES: 3
run: |
@@ -287,7 +273,6 @@ jobs:
DATABASE_URL: postgresql://localhost:1235/storage_controller
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
run: |
export ASAN_OPTIONS=detect_leaks=0
/tmp/neon/bin/neon_local init
/tmp/neon/bin/neon_local storage_controller start
@@ -334,7 +319,7 @@ jobs:
- name: Pytest regression tests
continue-on-error: ${{ matrix.lfc_state == 'with-lfc' && inputs.build-type == 'debug' }}
uses: ./.github/actions/run-python-test-set
timeout-minutes: ${{ matrix.sanitizers != 'enabled' && 60 || 180 }}
timeout-minutes: 60
with:
build_type: ${{ inputs.build-type }}
test_selection: regress
@@ -352,7 +337,6 @@ jobs:
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
PAGESERVER_GET_VECTORED_CONCURRENT_IO: sidecar-task
USE_LFC: ${{ matrix.lfc_state == 'with-lfc' && 'true' || 'false' }}
SANITIZERS: ${{ matrix.sanitizers }}
# Temporary disable this step until we figure out why it's so flaky
# Ref https://github.com/neondatabase/neon/issues/4540

View File

@@ -67,9 +67,9 @@ jobs:
- uses: actions/checkout@v4
with:
ref: ${{ github.event.pull_request.head.sha }}
ref: main
token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Look for existing PR
id: get-pr
env:
@@ -77,7 +77,7 @@ jobs:
run: |
ALREADY_CREATED="$(gh pr --repo ${GITHUB_REPOSITORY} list --head ${BRANCH} --base main --json number --jq '.[].number')"
echo "ALREADY_CREATED=${ALREADY_CREATED}" >> ${GITHUB_OUTPUT}
- name: Get changed labels
id: get-labels
if: steps.get-pr.outputs.ALREADY_CREATED != ''
@@ -94,6 +94,10 @@ jobs:
echo "LABELS_TO_ADD=${LABELS_TO_ADD}" >> ${GITHUB_OUTPUT}
echo "LABELS_TO_REMOVE=${LABELS_TO_REMOVE}" >> ${GITHUB_OUTPUT}
- uses: actions/checkout@v4
with:
ref: ${{ github.event.pull_request.head.sha }}
- run: git checkout -b "${BRANCH}"
- run: git push --force origin "${BRANCH}"
@@ -101,7 +105,7 @@ jobs:
- name: Create a Pull Request for CI run (if required)
if: steps.get-pr.outputs.ALREADY_CREATED == ''
env:
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
cat << EOF > body.md
@@ -138,7 +142,7 @@ jobs:
- run: git push --force origin "${BRANCH}"
if: steps.get-pr.outputs.ALREADY_CREATED != ''
cleanup:
# Close PRs and delete branchs if the original PR is closed.

View File

@@ -1,133 +0,0 @@
name: Build and Test with Sanitizers
on:
schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '0 1 * * *' # run once a day, timezone is utc
workflow_dispatch:
defaults:
run:
shell: bash -euxo pipefail {0}
concurrency:
# Allow only one workflow per any non-`main` branch.
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
cancel-in-progress: true
env:
RUST_BACKTRACE: 1
COPT: '-Werror'
jobs:
tag:
runs-on: [ self-hosted, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
outputs:
build-tag: ${{steps.build-tag.outputs.tag}}
steps:
# Need `fetch-depth: 0` to count the number of commits in the branch
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Get build tag
run: |
echo run:$GITHUB_RUN_ID
echo ref:$GITHUB_REF_NAME
echo rev:$(git rev-list --count HEAD)
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
echo "tag=$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
echo "tag=release-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
echo "tag=release-proxy-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release-compute" ]]; then
echo "tag=release-compute-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release', 'release-proxy', 'release-compute'"
echo "tag=$GITHUB_RUN_ID" >> $GITHUB_OUTPUT
fi
shell: bash
id: build-tag
build-build-tools-image:
uses: ./.github/workflows/build-build-tools-image.yml
secrets: inherit
build-and-test-locally:
needs: [ tag, build-build-tools-image ]
strategy:
fail-fast: false
matrix:
arch: [ x64, arm64 ]
build-type: [ release ]
uses: ./.github/workflows/_build-and-test-locally.yml
with:
arch: ${{ matrix.arch }}
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
build-tag: ${{ needs.tag.outputs.build-tag }}
build-type: ${{ matrix.build-type }}
test-cfg: '[{"pg_version":"v17", "sanitizers": "enabled"}]'
secrets: inherit
create-test-report:
needs: [ build-and-test-locally, build-build-tools-image ]
if: ${{ !cancelled() }}
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
contents: write
pull-requests: write
outputs:
report-url: ${{ steps.create-allure-report.outputs.report-url }}
runs-on: [ self-hosted, small ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
options: --init
steps:
- uses: actions/checkout@v4
- name: Create Allure report
if: ${{ !cancelled() }}
id: create-allure-report
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
- uses: actions/github-script@v7
if: ${{ !cancelled() }}
with:
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
retries: 5
script: |
const report = {
reportUrl: "${{ steps.create-allure-report.outputs.report-url }}",
reportJsonUrl: "${{ steps.create-allure-report.outputs.report-json-url }}",
}
const coverage = {}
const script = require("./scripts/comment-test-report.js")
await script({
github,
context,
fetch,
report,
coverage,
})

75
Cargo.lock generated
View File

@@ -300,9 +300,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "aws-config"
version = "1.5.10"
version = "1.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b49afaa341e8dd8577e1a2200468f98956d6eda50bcf4a53246cc00174ba924"
checksum = "dc47e70fc35d054c8fcd296d47a61711f043ac80534a10b4f741904f81e73a90"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -311,7 +311,7 @@ dependencies = [
"aws-sdk-sts",
"aws-smithy-async",
"aws-smithy-http",
"aws-smithy-json 0.60.7",
"aws-smithy-json",
"aws-smithy-runtime",
"aws-smithy-runtime-api",
"aws-smithy-types",
@@ -342,9 +342,9 @@ dependencies = [
[[package]]
name = "aws-runtime"
version = "1.4.4"
version = "1.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5ac934720fbb46206292d2c75b57e67acfc56fe7dfd34fb9a02334af08409ea"
checksum = "bee7643696e7fdd74c10f9eb42848a87fe469d35eae9c3323f80aa98f350baac"
dependencies = [
"aws-credential-types",
"aws-sigv4",
@@ -376,7 +376,7 @@ dependencies = [
"aws-runtime",
"aws-smithy-async",
"aws-smithy-http",
"aws-smithy-json 0.61.1",
"aws-smithy-json",
"aws-smithy-query",
"aws-smithy-runtime",
"aws-smithy-runtime-api",
@@ -399,7 +399,7 @@ dependencies = [
"aws-runtime",
"aws-smithy-async",
"aws-smithy-http",
"aws-smithy-json 0.61.1",
"aws-smithy-json",
"aws-smithy-runtime",
"aws-smithy-runtime-api",
"aws-smithy-types",
@@ -424,7 +424,7 @@ dependencies = [
"aws-smithy-checksums",
"aws-smithy-eventstream",
"aws-smithy-http",
"aws-smithy-json 0.61.1",
"aws-smithy-json",
"aws-smithy-runtime",
"aws-smithy-runtime-api",
"aws-smithy-types",
@@ -447,15 +447,15 @@ dependencies = [
[[package]]
name = "aws-sdk-sso"
version = "1.50.0"
version = "1.57.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05ca43a4ef210894f93096039ef1d6fa4ad3edfabb3be92b80908b9f2e4b4eab"
checksum = "c54bab121fe1881a74c338c5f723d1592bf3b53167f80268a1274f404e1acc38"
dependencies = [
"aws-credential-types",
"aws-runtime",
"aws-smithy-async",
"aws-smithy-http",
"aws-smithy-json 0.61.1",
"aws-smithy-json",
"aws-smithy-runtime",
"aws-smithy-runtime-api",
"aws-smithy-types",
@@ -469,15 +469,15 @@ dependencies = [
[[package]]
name = "aws-sdk-ssooidc"
version = "1.51.0"
version = "1.58.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abaf490c2e48eed0bb8e2da2fb08405647bd7f253996e0f93b981958ea0f73b0"
checksum = "8c8234fd024f7ac61c4e44ea008029bde934250f371efe7d4a39708397b1080c"
dependencies = [
"aws-credential-types",
"aws-runtime",
"aws-smithy-async",
"aws-smithy-http",
"aws-smithy-json 0.61.1",
"aws-smithy-json",
"aws-smithy-runtime",
"aws-smithy-runtime-api",
"aws-smithy-types",
@@ -491,15 +491,15 @@ dependencies = [
[[package]]
name = "aws-sdk-sts"
version = "1.51.0"
version = "1.58.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b68fde0d69c8bfdc1060ea7da21df3e39f6014da316783336deff0a9ec28f4bf"
checksum = "ba60e1d519d6f23a9df712c04fdeadd7872ac911c84b2f62a8bda92e129b7962"
dependencies = [
"aws-credential-types",
"aws-runtime",
"aws-smithy-async",
"aws-smithy-http",
"aws-smithy-json 0.61.1",
"aws-smithy-json",
"aws-smithy-query",
"aws-smithy-runtime",
"aws-smithy-runtime-api",
@@ -514,9 +514,9 @@ dependencies = [
[[package]]
name = "aws-sigv4"
version = "1.2.6"
version = "1.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d3820e0c08d0737872ff3c7c1f21ebbb6693d832312d6152bf18ef50a5471c2"
checksum = "690118821e46967b3c4501d67d7d52dd75106a9c54cf36cefa1985cedbe94e05"
dependencies = [
"aws-credential-types",
"aws-smithy-eventstream",
@@ -543,9 +543,9 @@ dependencies = [
[[package]]
name = "aws-smithy-async"
version = "1.2.1"
version = "1.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62220bc6e97f946ddd51b5f1361f78996e704677afc518a4ff66b7a72ea1378c"
checksum = "fa59d1327d8b5053c54bf2eaae63bf629ba9e904434d0835a28ed3c0ed0a614e"
dependencies = [
"futures-util",
"pin-project-lite",
@@ -575,9 +575,9 @@ dependencies = [
[[package]]
name = "aws-smithy-eventstream"
version = "0.60.5"
version = "0.60.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cef7d0a272725f87e51ba2bf89f8c21e4df61b9e49ae1ac367a6d69916ef7c90"
checksum = "8b18559a41e0c909b77625adf2b8c50de480a8041e5e4a3f5f7d177db70abc5a"
dependencies = [
"aws-smithy-types",
"bytes",
@@ -586,9 +586,9 @@ dependencies = [
[[package]]
name = "aws-smithy-http"
version = "0.60.11"
version = "0.60.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c8bc3e8fdc6b8d07d976e301c02fe553f72a39b7a9fea820e023268467d7ab6"
checksum = "7809c27ad8da6a6a68c454e651d4962479e81472aa19ae99e59f9aba1f9713cc"
dependencies = [
"aws-smithy-eventstream",
"aws-smithy-runtime-api",
@@ -607,18 +607,9 @@ dependencies = [
[[package]]
name = "aws-smithy-json"
version = "0.60.7"
version = "0.61.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4683df9469ef09468dad3473d129960119a0d3593617542b7d52086c8486f2d6"
dependencies = [
"aws-smithy-types",
]
[[package]]
name = "aws-smithy-json"
version = "0.61.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee4e69cc50921eb913c6b662f8d909131bb3e6ad6cb6090d3a39b66fc5c52095"
checksum = "623a51127f24c30776c8b374295f2df78d92517386f77ba30773f15a30ce1422"
dependencies = [
"aws-smithy-types",
]
@@ -635,9 +626,9 @@ dependencies = [
[[package]]
name = "aws-smithy-runtime"
version = "1.7.4"
version = "1.7.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f20685047ca9d6f17b994a07f629c813f08b5bce65523e47124879e60103d45"
checksum = "865f7050bbc7107a6c98a397a9fcd9413690c27fa718446967cf03b2d3ac517e"
dependencies = [
"aws-smithy-async",
"aws-smithy-http",
@@ -679,9 +670,9 @@ dependencies = [
[[package]]
name = "aws-smithy-types"
version = "1.2.9"
version = "1.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fbd94a32b3a7d55d3806fe27d98d3ad393050439dd05eb53ece36ec5e3d3510"
checksum = "a28f6feb647fb5e0d5b50f0472c19a7db9462b74e2fec01bb0b44eedcc834e97"
dependencies = [
"base64-simd",
"bytes",
@@ -714,9 +705,9 @@ dependencies = [
[[package]]
name = "aws-types"
version = "1.3.3"
version = "1.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5221b91b3e441e6675310829fd8984801b772cb1546ef6c0e54dec9f1ac13fef"
checksum = "b0df5a18c4f951c645300d365fec53a61418bcf4650f604f85fe2a665bfaa0c2"
dependencies = [
"aws-credential-types",
"aws-smithy-async",

View File

@@ -10,29 +10,18 @@ ICU_PREFIX_DIR := /usr/local/icu
# environment variable.
#
BUILD_TYPE ?= debug
WITH_SANITIZERS ?= no
ifeq ($(BUILD_TYPE),release)
PG_CONFIGURE_OPTS = --enable-debug --with-openssl
PG_CFLAGS = -O2 -g3 $(CFLAGS)
PG_LDFLAGS = $(LDFLAGS)
# Unfortunately, `--profile=...` is a nightly feature
CARGO_BUILD_FLAGS += --release
else ifeq ($(BUILD_TYPE),debug)
PG_CONFIGURE_OPTS = --enable-debug --with-openssl --enable-cassert --enable-depend
PG_CFLAGS = -O0 -g3 $(CFLAGS)
PG_LDFLAGS = $(LDFLAGS)
else
$(error Bad build type '$(BUILD_TYPE)', see Makefile for options)
endif
ifeq ($(WITH_SANITIZERS),yes)
PG_CFLAGS += -fsanitize=address -fsanitize=undefined -fno-sanitize-recover
COPT += -Wno-error # to avoid failing on warnings induced by sanitizers
PG_LDFLAGS = -fsanitize=address -fsanitize=undefined -static-libasan -static-libubsan $(LDFLAGS)
export CC := gcc
export ASAN_OPTIONS := detect_leaks=0
endif
ifeq ($(shell test -e /home/nonroot/.docker_build && echo -n yes),yes)
# Exclude static build openssl, icu for local build (MacOS, Linux)
# Only keep for build type release and debug
@@ -44,9 +33,7 @@ endif
UNAME_S := $(shell uname -s)
ifeq ($(UNAME_S),Linux)
# Seccomp BPF is only available for Linux
ifneq ($(WITH_SANITIZERS),yes)
PG_CONFIGURE_OPTS += --with-libseccomp
endif
PG_CONFIGURE_OPTS += --with-libseccomp
else ifeq ($(UNAME_S),Darwin)
PG_CFLAGS += -DUSE_PREFETCH
ifndef DISABLE_HOMEBREW
@@ -119,7 +106,7 @@ $(POSTGRES_INSTALL_DIR)/build/%/config.status:
EXTRA_VERSION=$$(cd $(ROOT_PROJECT_DIR)/vendor/postgres-$$VERSION && git rev-parse HEAD); \
(cd $(POSTGRES_INSTALL_DIR)/build/$$VERSION && \
env PATH="$(EXTRA_PATH_OVERRIDES):$$PATH" $(ROOT_PROJECT_DIR)/vendor/postgres-$$VERSION/configure \
CFLAGS='$(PG_CFLAGS)' LDFLAGS='$(PG_LDFLAGS)' \
CFLAGS='$(PG_CFLAGS)' \
$(PG_CONFIGURE_OPTS) --with-extra-version=" ($$EXTRA_VERSION)" \
--prefix=$(abspath $(POSTGRES_INSTALL_DIR))/$$VERSION > configure.log)

View File

@@ -12,8 +12,6 @@ RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc && \
echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc
COPY build_tools/patches/pgcopydbv017.patch /pgcopydbv017.patch
RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
set -e && \
apt update && \
@@ -46,7 +44,6 @@ RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
mkdir /tmp/pgcopydb && \
tar -xzf /tmp/pgcopydb.tar.gz -C /tmp/pgcopydb --strip-components=1 && \
cd /tmp/pgcopydb && \
patch -p1 < /pgcopydbv017.patch && \
make -s clean && \
make -s -j12 install && \
libpq_path=$(find /lib /usr/lib -name "libpq.so.5" | head -n 1) && \

View File

@@ -1,57 +0,0 @@
diff --git a/src/bin/pgcopydb/copydb.c b/src/bin/pgcopydb/copydb.c
index d730b03..69a9be9 100644
--- a/src/bin/pgcopydb/copydb.c
+++ b/src/bin/pgcopydb/copydb.c
@@ -44,6 +44,7 @@ GUC dstSettings[] = {
{ "synchronous_commit", "'off'" },
{ "statement_timeout", "0" },
{ "lock_timeout", "0" },
+ { "idle_in_transaction_session_timeout", "0" },
{ NULL, NULL },
};
diff --git a/src/bin/pgcopydb/pgsql.c b/src/bin/pgcopydb/pgsql.c
index 94f2f46..e051ba8 100644
--- a/src/bin/pgcopydb/pgsql.c
+++ b/src/bin/pgcopydb/pgsql.c
@@ -2319,6 +2319,11 @@ pgsql_execute_log_error(PGSQL *pgsql,
LinesBuffer lbuf = { 0 };
+ if (message != NULL){
+ // make sure message is writable by splitLines
+ message = strdup(message);
+ }
+
if (!splitLines(&lbuf, message))
{
/* errors have already been logged */
@@ -2332,6 +2337,7 @@ pgsql_execute_log_error(PGSQL *pgsql,
PQbackendPID(pgsql->connection),
lbuf.lines[lineNumber]);
}
+ free(message); // free copy of message we created above
if (pgsql->logSQL)
{
@@ -3174,11 +3180,18 @@ pgcopy_log_error(PGSQL *pgsql, PGresult *res, const char *context)
/* errors have already been logged */
return;
}
-
if (res != NULL)
{
char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
- strlcpy(pgsql->sqlstate, sqlstate, sizeof(pgsql->sqlstate));
+ if (sqlstate == NULL)
+ {
+ // PQresultErrorField returned NULL!
+ pgsql->sqlstate[0] = '\0'; // Set to an empty string to avoid segfault
+ }
+ else
+ {
+ strlcpy(pgsql->sqlstate, sqlstate, sizeof(pgsql->sqlstate));
+ }
}
char *endpoint =

View File

@@ -47,9 +47,7 @@ files:
# Allow postgres user (which is what compute_ctl runs as) to run /neonvm/bin/resize-swap
# and /neonvm/bin/set-disk-quota as root without requiring entering a password (NOPASSWD),
# regardless of hostname (ALL)
#
# Also allow it to shut down the VM. The fast_import job does that when it's finished.
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota
- filename: cgconfig.conf
content: |
# Configuration for cgroups in VM compute nodes

View File

@@ -231,14 +231,6 @@ pub(crate) async fn main() -> anyhow::Result<()> {
])
.env_clear()
.env("LD_LIBRARY_PATH", &pg_lib_dir)
.env(
"ASAN_OPTIONS",
std::env::var("ASAN_OPTIONS").unwrap_or_default(),
)
.env(
"UBSAN_OPTIONS",
std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()

View File

@@ -261,13 +261,7 @@ fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
let mut filled_cmd = cmd.env_clear().env("RUST_BACKTRACE", backtrace_setting);
// Pass through these environment variables to the command
for var in [
"LLVM_PROFILE_FILE",
"FAILPOINTS",
"RUST_LOG",
"ASAN_OPTIONS",
"UBSAN_OPTIONS",
] {
for var in ["LLVM_PROFILE_FILE", "FAILPOINTS", "RUST_LOG"] {
if let Some(val) = std::env::var_os(var) {
filled_cmd = filled_cmd.env(var, val);
}

View File

@@ -221,17 +221,7 @@ impl StorageController {
"-p",
&format!("{}", postgres_port),
];
let pg_lib_dir = self.get_pg_lib_dir().await.unwrap();
let envs = [
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
];
let exitcode = Command::new(bin_path)
.args(args)
.envs(envs)
.spawn()?
.wait()
.await?;
let exitcode = Command::new(bin_path).args(args).spawn()?.wait().await?;
Ok(exitcode.success())
}
@@ -252,11 +242,6 @@ impl StorageController {
let pg_bin_dir = self.get_pg_bin_dir().await?;
let createdb_path = pg_bin_dir.join("createdb");
let pg_lib_dir = self.get_pg_lib_dir().await.unwrap();
let envs = [
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
];
let output = Command::new(&createdb_path)
.args([
"-h",
@@ -269,7 +254,6 @@ impl StorageController {
&username(),
DB_NAME,
])
.envs(envs)
.output()
.await
.expect("Failed to spawn createdb");

View File

@@ -2,4 +2,4 @@
set -ex
cd "$(dirname ${0})"
patch -p1 <test-upgrade.patch
pg_prove -d contrib_regression test.sql
pg_prove test.sql

View File

@@ -285,10 +285,10 @@ To summarize, list of cplane changes:
### storage_controller implementation
If desired, we may continue using current 'load everything on startup and keep
in memory' approach: single timeline shouldn't take more than 100 bytes (it's 16
byte tenant_id, 16 byte timeline_id, int generation, vec of ~3 safekeeper ids
plus some flags), so 10^6 of timelines shouldn't take more than 100MB.
Current 'load everything on startup and keep in memory' easy design is fine.
Single timeline shouldn't take more than 100 bytes (it's 16 byte tenant_id, 16
byte timeline_id, int generation, vec of ~3 safekeeper ids plus some flags), so
10^6 of timelines shouldn't take more than 100MB.
Similar to pageserver attachment Intents storage_controller would have in-memory
`MigrationRequest` (or its absense) for each timeline and pool of tasks trying
@@ -296,7 +296,7 @@ to make these request reality; this ensures one instance of storage_controller
won't do several migrations on the same timeline concurrently. In the first
version it is simpler to have more manual control and no retries, i.e. migration
failure removes the request. Later we can build retries and automatic
scheduling/migration around. `MigrationRequest` is
scheduling/migration. `MigrationRequest` is
```
enum MigrationRequest {
To(Vec<NodeId>),
@@ -313,9 +313,9 @@ similarly, in the first version it is ok to trigger it manually).
#### Schema
`safekeepers` table mirroring current `nodes` should be added, except that for
`scheduling_policy`: it is enough to have at least in the beginning only 3
fields: 1) `active` 2) `paused` (initially means only not assign new tlis there
3) `decomissioned` (node is removed).
`scheduling_policy` field (seems like `status` is a better name for it): it is enough
to have at least in the beginning only 3 fields: 1) `active` 2) `offline` 3)
`decomissioned`.
`timelines` table:
```
@@ -324,24 +324,18 @@ table! {
timelines (tenant_id, timeline_id) {
timeline_id -> Varchar,
tenant_id -> Varchar,
start_lsn -> pg_lsn,
generation -> Int4,
sk_set -> Array<Int4>, // list of safekeeper ids
new_sk_set -> Nullable<Array<Int8>>, // list of safekeeper ids, null if not joint conf
new_sk_set -> Nullable<Array<Int4>>, // list of safekeeper ids, null if not joint conf
cplane_notified_generation -> Int4,
deleted_at -> Nullable<Timestamptz>,
}
}
```
`start_lsn` is needed to create timeline on safekeepers properly, see below. We
might also want to add ancestor_timeline_id to preserve the hierarchy, but for
this RFC it is not needed.
#### API
Node management is similar to pageserver:
1) POST `/control/v1/safekeepers` inserts safekeeper.
1) POST `/control/v1/safekeepers` upserts safekeeper.
2) GET `/control/v1/safekeepers` lists safekeepers.
3) GET `/control/v1/safekeepers/:node_id` gets safekeeper.
4) PUT `/control/v1/safekepers/:node_id/status` changes status to e.g.
@@ -351,15 +345,25 @@ Node management is similar to pageserver:
Safekeeper deploy scripts should register safekeeper at storage_contorller as
they currently do with cplane, under the same id.
Timeline creation/deletion will work through already existing POST and DELETE
`tenant/:tenant_id/timeline`. Cplane is expected to retry both until they
succeed. See next section on the implementation details.
Timeline creation/deletion: already existing POST `tenant/:tenant_id/timeline`
would 1) choose initial set of safekeepers; 2) write to the db initial
`Configuration` with `INSERT ON CONFLICT DO NOTHING` returning existing row in
case of conflict; 3) create timeline on the majority of safekeepers (already
created is ok).
We don't want to block timeline creation/deletion when one safekeeper is down.
Currently this is crutched by compute implicitly creating timeline on any
safekeeper it is connected to. This creates ugly timeline state on safekeeper
when timeline is created, but start LSN is not defined yet. Next section
describes dealing with this.
We don't want to block timeline creation when one safekeeper is down. Currently
this is solved by compute implicitly creating timeline on any safekeeper it is
connected to. This creates ugly timeline state on safekeeper when timeline is
created, but start LSN is not defined yet. It would be nice to remove this; to
do that, controller can in the background retry to create timeline on
safekeeper(s) which missed that during initial creation call. It can do that
through `pull_timeline` from majority so it doesn't need to remember
`parent_lsn` in its db.
Timeline deletion removes the row from the db and forwards deletion to the
current configuration members. Without additional actions deletions might leak,
see below on this; initially let's ignore these, reporting to cplane success if
at least one safekeeper deleted the timeline (this will remove s3 data).
Tenant deletion repeats timeline deletion for all timelines.
@@ -391,6 +395,26 @@ Similar call should be added for the tenant.
It would be great to have some way of subscribing to the results (apart from
looking at logs/metrics).
Migration is executed as described above. One subtlety is that (local) deletion on
source safekeeper might fail, which is not a problem if we are going to
decomission the node but leaves garbage otherwise. I'd propose in the first version
1) Don't attempt deletion at all if node status is `offline`.
2) If it failed, just issue warning.
And add PUT `/control/v1/safekeepers/:node_id/scrub` endpoint which would find and
remove garbage timelines for manual use. It will 1) list all timelines on the
safekeeper 2) compare each one against configuration storage: if timeline
doesn't exist at all (had been deleted), it can be deleted. Otherwise, it can
be deleted under generation number if node is not member of current generation.
Automating this is untrivial; we'd need to register all potential missing
deletions <tenant_id, timeline_id, generation, node_id> in the same transaction
which switches configurations. Similarly when timeline is fully deleted to
prevent cplane operation from blocking when some safekeeper is not available
deletion should be also registered.
One more task pool should infinitely retry notifying control plane about changed
safekeeper sets.
3) GET `/control/v1/tenant/:tenant_id/timeline/:timeline_id/` should return
current in memory state of the timeline and pending `MigrationRequest`,
if any.
@@ -399,153 +423,12 @@ looking at logs/metrics).
migration by switching configuration from the joint to the one with (previous) `sk_set` under CAS
(incrementing generation as always).
#### API implementation and reconciliation
For timeline creation/deletion we want to preserve the basic assumption that
unreachable minority (1 sk of 3) doesn't block their completion, but eventually
we want to finish creation/deletion on nodes which missed it (unless they are
removed). Similarly for migration; it may and should finish even though excluded
members missed their exclusion. And of course e.g. such pending exclusion on
node C after migration ABC -> ABD must not prevent next migration ABD -> ABE. As
another example, if some node missed timeline creation it clearly must not block
migration from it. Hence it is natural to have per safekeeper background
reconciler which retries these ops until they succeed. There are 3 possible
operation types, and the type is defined by timeline state (membership
configuration and whether it is deleted) and safekeeper id: we may need to
create timeline on sk (node added), locally delete it (node excluded, somewhat
similar to detach) or globally delete it (timeline is deleted).
Next, on storage controller restart in principle these pending operations can be
figured out by comparing safekeepers state against storcon state. But it seems
better to me to materialize them in the database; it is not expensive, avoids
these startup scans which themselves can fail etc and makes it very easy to see
outstanding work directly at the source of truth -- the db. So we can add table
`safekeeper_timeline_pending_ops`
```
table! {
// timeline_id, sk_id is primary key
safekeeper_timeline_pending_ops (sk_id, tenant_id, timeline_id) {
sk_id -> int8,
tenant_id -> Varchar,
timeline_id -> Varchar,
generation -> Int4,
op_type -> Varchar,
}
}
```
`op_type` can be `include` (seed from peers and ensure generation is up to
date), `exclude` (remove locally) and `delete`. Field is actually not strictly
needed as it can be computed from current configuration, but gives more explicit
observability.
`generation` is necessary there because after op is done reconciler must remove
it and not remove another row with higher gen which in theory might appear.
Any insert of row should overwrite (remove) all rows with the same sk and
timeline id but lower `generation` as next op makes previous obsolete. Insertion
of `op_type` `delete` overwrites all rows.
About `exclude`: rather than adding explicit safekeeper http endpoint, it is
reasonable to reuse membership switch endpoint: if safekeeper is not member
of the configuration it locally removes the timeline on the switch. In this case
404 should also be considered an 'ok' answer by the caller.
So, main loop of per sk reconcile reads `safekeeper_timeline_pending_ops`
joined with timeline configuration to get current conf (with generation `n`)
for the safekeeper and does the jobs, infinitely retrying failures:
1) If node is member (`include`):
- Check if timeline exists on it, if not, call pull_timeline on it from
other members
- Call switch configuration to the current
2) If node is not member (`exclude`):
- Call switch configuration to the current, 404 is ok.
3) If timeline is deleted (`delete`), call delete.
In cases 1 and 2 remove `safekeeper_timeline_pending_ops` for the sk and
timeline with generation <= `n` if `op_type` is not `delete`.
In case 3 also remove `safekeeper_timeline_pending_ops`
entry + remove `timelines` entry if there is nothing left in `safekeeper_timeline_pending_ops` for the timeline.
Let's consider in details how APIs can be implemented from this angle.
Timeline creation. It is assumed that cplane retries it until success, so all
actions must be idempotent. Now, a tricky point here is timeline start LSN. For
the initial (tenant creation) call cplane doesn't know it. However, setting
start_lsn on safekeepers during creation is a good thing -- it provides a
guarantee that walproposer can always find a common point in WAL histories of
safekeeper and its own, and so absense of it would be a clear sign of
corruption. The following sequence works:
1) Create timeline (or observe that it exists) on pageserver,
figuring out last_record_lsn in response.
2) Choose safekeepers and insert (ON CONFLICT DO NOTHING) timeline row into the
db. Note that last_record_lsn returned on the previous step is movable as it
changes once ingestion starts, insert must not overwrite it (as well as other
fields like membership conf). On the contrary, start_lsn used in the next
step must be set to the value in the db. cplane_notified_generation can be set
to 1 (initial generation) in insert to avoid notifying cplane about initial
conf as cplane will receive it in timeline creation request anyway.
3) Issue timeline creation calls to at least majority of safekeepers. Using
majority here is not necessary but handy because it guarantees that any live
majority will have at least one sk with created timeline and so
reconciliation task can use pull_timeline shared with migration instead of
create timeline special init case. OFC if timeline is already exists call is
ignored.
4) For minority of safekeepers which could have missed creation insert
entries to `safekeeper_timeline_pending_ops`. We won't miss this insertion
because response to cplane is sent only after it has happened, and cplane
retries the call until 200 response.
There is a small question how request handler (timeline creation in this
case) would interact with per sk reconciler. As always I prefer to do the
simplest possible thing and here it seems to be just waking it up so it
re-reads the db for work to do. Passing work in memory is faster, but
that shouldn't matter, and path to scan db for work will exist anyway,
simpler to reuse it.
For pg version / wal segment size: while we may persist them in `timelines`
table, it is not necessary as initial creation at step 3 can take them from
pageserver or cplane creation call and later pull_timeline will carry them
around.
Timeline migration.
1) CAS to the db to create joint conf, and in the same transaction create
`safekeeper_timeline_pending_ops` `include` entries to initialize new members
as well as deliver this conf to current ones; poke per sk reconcilers to work
on it. Also any conf change should also poke cplane notifier task(s).
2) Once it becomes possible per alg description above, get out of joint conf
with another CAS. Task should get wakeups from per sk reconcilers because
conf switch is required for advancement; however retries should be sleep
based as well as LSN advancement might be needed, though in happy path
it isn't. To see whether further transition is possible on wakup migration
executor polls safekeepers per the algorithm. CAS creating new conf with only
new members should again insert entries to `safekeeper_timeline_pending_ops`
to switch them there, as well as `exclude` rows to remove timeline from
old members.
Timeline deletion: just set `deleted_at` on the timeline row and insert
`safekeeper_timeline_pending_ops` entries in the same xact, the rest is done by
per sk reconcilers.
When node is removed (set to `decomissioned`), `safekeeper_timeline_pending_ops`
for it must be cleared in the same transaction.
One more task pool should infinitely retry notifying control plane about changed
safekeeper sets (trying making `cplane_notified_generation` equal `generation`).
#### Dealing with multiple instances of storage_controller
Operations described above executed concurrently might create some errors but do
not prevent progress, so while we normally don't want to run multiple instances
of storage_controller it is fine to have it temporarily, e.g. during redeploy.
To harden against some controller instance creating some work in
`safekeeper_timeline_pending_ops` and then disappearing without anyone pickup up
the job per sk reconcilers apart from explicit wakups should scan for work
periodically. It is possible to remove that though if all db updates are
protected with leadership token/term -- then such scans are needed only after
leadership is acquired.
Any interactions with db update in-memory controller state, e.g. if migration
request failed because different one is in progress, controller remembers that
and tries to finish it.
@@ -662,7 +545,7 @@ Aurora does this but similarly I don't think this is needed.
We should use Compute <-> safekeeper protocol change to include other (long
yearned) modifications:
- send data in network order without putting whole structs to be arch independent
- send data in network order to make arm work.
- remove term_start_lsn from AppendRequest
- add horizon to TermHistory
- add to ProposerGreeting number of connection from this wp to sk

View File

@@ -207,11 +207,11 @@ impl RemoteExtSpec {
if !self
.public_extensions
.as_ref()
.is_some_and(|exts| exts.iter().any(|e| e == real_ext_name))
.is_some_and(|exts| exts.iter().any(|e| e == ext_name))
&& !self
.custom_extensions
.as_ref()
.is_some_and(|exts| exts.iter().any(|e| e == real_ext_name))
.is_some_and(|exts| exts.iter().any(|e| e == ext_name))
{
return Err(anyhow::anyhow!("extension {} is not found", real_ext_name));
}
@@ -414,7 +414,7 @@ mod tests {
"public_extensions": ["ext"],
"custom_extensions": [],
"library_index": {
"extlib": "ext",
"ext": "ext"
},
"extension_data": {
"ext": {
@@ -430,12 +430,6 @@ mod tests {
rspec
.get_ext("ext", false, "latest", "v17")
.expect("Extension should be found");
// test library index for the case when library name
// doesn't match the extension name
rspec
.get_ext("extlib", true, "latest", "v17")
.expect("Library should be found");
}
#[test]

View File

@@ -94,7 +94,6 @@ pub struct ConfigToml {
pub ondemand_download_behavior_treat_error_as_warn: bool,
#[serde(with = "humantime_serde")]
pub background_task_maximum_delay: Duration,
pub use_compaction_semaphore: bool,
pub control_plane_api: Option<reqwest::Url>,
pub control_plane_api_token: Option<String>,
pub control_plane_emergency_mode: bool,
@@ -471,7 +470,6 @@ impl Default for ConfigToml {
DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY,
)
.unwrap()),
use_compaction_semaphore: false,
control_plane_api: (None),
control_plane_api_token: (None),

View File

@@ -76,15 +76,7 @@ impl Conf {
let mut cmd = Command::new(path);
cmd.env_clear()
.env("LD_LIBRARY_PATH", self.pg_lib_dir()?)
.env("DYLD_LIBRARY_PATH", self.pg_lib_dir()?)
.env(
"ASAN_OPTIONS",
std::env::var("ASAN_OPTIONS").unwrap_or_default(),
)
.env(
"UBSAN_OPTIONS",
std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
);
.env("DYLD_LIBRARY_PATH", self.pg_lib_dir()?);
Ok(cmd)
}

View File

@@ -64,14 +64,6 @@ pub async fn do_run_initdb(args: RunInitdbArgs<'_>) -> Result<(), Error> {
.env_clear()
.env("LD_LIBRARY_PATH", library_search_path)
.env("DYLD_LIBRARY_PATH", library_search_path)
.env(
"ASAN_OPTIONS",
std::env::var("ASAN_OPTIONS").unwrap_or_default(),
)
.env(
"UBSAN_OPTIONS",
std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
)
.stdin(std::process::Stdio::null())
// stdout invocation produces the same output every time, we don't need it
.stdout(std::process::Stdio::null())

View File

@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
use crate::{
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT,
DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
};
/// External backup storage configuration, enough for creating a client for that storage.
@@ -45,11 +45,11 @@ impl RemoteStorageKind {
impl RemoteStorageConfig {
/// Helper to fetch the configured concurrency limit.
pub fn concurrency_limit(&self) -> usize {
pub fn concurrency_limit(&self) -> Option<usize> {
match &self.storage {
RemoteStorageKind::LocalFs { .. } => DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT,
RemoteStorageKind::AwsS3(c) => c.concurrency_limit.into(),
RemoteStorageKind::AzureContainer(c) => c.concurrency_limit.into(),
RemoteStorageKind::LocalFs { .. } => None,
RemoteStorageKind::AwsS3(c) => Some(c.concurrency_limit.into()),
RemoteStorageKind::AzureContainer(c) => Some(c.concurrency_limit.into()),
}
}
}

View File

@@ -65,12 +65,6 @@ pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
/// Here, a limit of max 20k concurrent connections was noted.
/// <https://learn.microsoft.com/en-us/answers/questions/1301863/is-there-any-limitation-to-concurrent-connections>
pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 100;
/// Set this limit analogously to the S3 limit.
///
/// The local filesystem backend doesn't enforce a concurrency limit itself, but this also bounds
/// the upload queue concurrency. Some tests create thousands of uploads, which slows down the
/// quadratic scheduling of the upload queue, and there is no point spawning so many Tokio tasks.
pub const DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT: usize = 100;
/// No limits on the client side, which currenltly means 1000 for AWS S3.
/// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;

View File

@@ -39,7 +39,7 @@ function initdb_with_args {
;;
esac
eval env -i LD_LIBRARY_PATH="$PG_BIN"/../lib ASAN_OPTIONS="${ASAN_OPTIONS-}" UBSAN_OPTIONS="${UBSAN_OPTIONS-}" "${cmd[*]}"
eval env -i LD_LIBRARY_PATH="$PG_BIN"/../lib "${cmd[*]}"
}
rm -fr "$DATA_DIR"

View File

@@ -8,22 +8,19 @@ use strum_macros::{EnumString, VariantNames};
/// Logs a critical error, similarly to `tracing::error!`. This will:
///
/// * Emit an ERROR log message with prefix "CRITICAL:" and a backtrace.
/// * Trigger a pageable alert (via the metric below).
/// * Increment libmetrics_tracing_event_count{level="critical"}, and indirectly level="error".
/// * Trigger a pageable alert (via the metric above).
/// * In debug builds, panic the process.
///
/// When including errors in the message, please use {err:?} to include the error cause and original
/// backtrace.
#[macro_export]
macro_rules! critical {
($($arg:tt)*) => {{
($($arg:tt)*) => {
if cfg!(debug_assertions) {
panic!($($arg)*);
}
$crate::logging::TRACING_EVENT_COUNT_METRIC.inc_critical();
let backtrace = std::backtrace::Backtrace::capture();
tracing::error!("CRITICAL: {}\n{backtrace}", format!($($arg)*));
}};
};
}
#[derive(EnumString, strum_macros::Display, VariantNames, Eq, PartialEq, Debug, Clone, Copy)]

View File

@@ -140,10 +140,6 @@ pub struct PageServerConf {
/// not terrible.
pub background_task_maximum_delay: Duration,
/// If true, use a separate semaphore for compaction tasks instead of the common background task
/// semaphore. Defaults to false.
pub use_compaction_semaphore: bool,
pub control_plane_api: Option<Url>,
/// JWT token for use with the control plane API.
@@ -336,7 +332,6 @@ impl PageServerConf {
test_remote_failures,
ondemand_download_behavior_treat_error_as_warn,
background_task_maximum_delay,
use_compaction_semaphore,
control_plane_api,
control_plane_api_token,
control_plane_emergency_mode,
@@ -390,7 +385,6 @@ impl PageServerConf {
test_remote_failures,
ondemand_download_behavior_treat_error_as_warn,
background_task_maximum_delay,
use_compaction_semaphore,
control_plane_api,
control_plane_emergency_mode,
heatmap_upload_concurrency,

View File

@@ -8,6 +8,7 @@ use std::time::Duration;
use crate::controller_upcall_client::ControlPlaneGenerationsApi;
use crate::metrics;
use crate::tenant::remote_timeline_client::remote_layer_path;
use crate::tenant::remote_timeline_client::remote_timeline_path;
use crate::tenant::remote_timeline_client::LayerFileMetadata;
use crate::virtual_file::MaybeFatalIo;
@@ -462,18 +463,45 @@ impl DeletionQueueClient {
///
/// The `current_generation` is the generation of this pageserver's current attachment. The
/// generations in `layers` are the generations in which those layers were written.
pub(crate) fn push_layers(
pub(crate) async fn push_layers(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
current_generation: Generation,
layers: Vec<(LayerName, LayerFileMetadata)>,
) -> Result<(), DeletionQueueError> {
// None generations are not valid for attached tenants: they must always be attached in
// a known generation. None generations are still permitted for layers in the index because
// they may be historical.
assert!(!current_generation.is_none());
if current_generation.is_none() {
debug!("Enqueuing deletions in legacy mode, skipping queue");
let mut layer_paths = Vec::new();
for (layer, meta) in layers {
layer_paths.push(remote_layer_path(
&tenant_shard_id.tenant_id,
&timeline_id,
meta.shard,
&layer,
meta.generation,
));
}
self.push_immediate(layer_paths).await?;
return self.flush_immediate().await;
}
self.push_layers_sync(tenant_shard_id, timeline_id, current_generation, layers)
}
/// When a Tenant has a generation, push_layers is always synchronous because
/// the ListValidator channel is an unbounded channel.
///
/// This can be merged into push_layers when we remove the Generation-less mode
/// support (`<https://github.com/neondatabase/neon/issues/5395>`)
pub(crate) fn push_layers_sync(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
current_generation: Generation,
layers: Vec<(LayerName, LayerFileMetadata)>,
) -> Result<(), DeletionQueueError> {
metrics::DELETION_QUEUE
.keys_submitted
.inc_by(layers.len() as u64);
@@ -929,12 +957,14 @@ mod test {
// File should still be there after we push it to the queue (we haven't pushed enough to flush anything)
info!("Pushing");
client.push_layers(
tenant_shard_id,
TIMELINE_ID,
now_generation,
[(layer_file_name_1.clone(), layer_metadata)].to_vec(),
)?;
client
.push_layers(
tenant_shard_id,
TIMELINE_ID,
now_generation,
[(layer_file_name_1.clone(), layer_metadata)].to_vec(),
)
.await?;
assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
assert_local_files(&[], &deletion_prefix);
@@ -987,12 +1017,14 @@ mod test {
assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
tracing::debug!("Pushing...");
client.push_layers(
tenant_shard_id,
TIMELINE_ID,
stale_generation,
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
)?;
client
.push_layers(
tenant_shard_id,
TIMELINE_ID,
stale_generation,
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
)
.await?;
// We enqueued the operation in a stale generation: it should have failed validation
tracing::debug!("Flushing...");
@@ -1000,12 +1032,14 @@ mod test {
assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
tracing::debug!("Pushing...");
client.push_layers(
tenant_shard_id,
TIMELINE_ID,
latest_generation,
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
)?;
client
.push_layers(
tenant_shard_id,
TIMELINE_ID,
latest_generation,
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
)
.await?;
// We enqueued the operation in a fresh generation: it should have passed validation
tracing::debug!("Flushing...");
@@ -1040,24 +1074,28 @@ mod test {
// generation gets that treatment)
let remote_layer_file_name_historical =
ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
client.push_layers(
tenant_shard_id,
TIMELINE_ID,
now_generation.previous(),
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
)?;
client
.push_layers(
tenant_shard_id,
TIMELINE_ID,
now_generation.previous(),
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
)
.await?;
// Inject a deletion in the generation before generation_now: after restart,
// this deletion should get executed, because we execute deletions in the
// immediately previous generation on the same node.
let remote_layer_file_name_previous =
ctx.write_remote_layer(EXAMPLE_LAYER_NAME_ALT, layer_generation)?;
client.push_layers(
tenant_shard_id,
TIMELINE_ID,
now_generation,
[(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
)?;
client
.push_layers(
tenant_shard_id,
TIMELINE_ID,
now_generation,
[(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
)
.await?;
client.flush().await?;
assert_remote_files(
@@ -1101,7 +1139,6 @@ pub(crate) mod mock {
use tracing::info;
use super::*;
use crate::tenant::remote_timeline_client::remote_layer_path;
use std::sync::atomic::{AtomicUsize, Ordering};
pub struct ConsumerState {

View File

@@ -61,7 +61,6 @@ use crate::{
remote_timeline_client::LayerFileMetadata,
secondary::SecondaryTenant,
storage_layer::{AsLayerDesc, EvictionError, Layer, LayerName, LayerVisibilityHint},
tasks::sleep_random,
},
CancellableTask, DiskUsageEvictionTask,
};
@@ -211,8 +210,14 @@ async fn disk_usage_eviction_task(
info!("disk usage based eviction task finishing");
};
if sleep_random(task_config.period, &cancel).await.is_err() {
return;
use crate::tenant::tasks::random_init_delay;
{
if random_init_delay(task_config.period, &cancel)
.await
.is_err()
{
return;
}
}
let mut iteration_no = 0;

View File

@@ -6,7 +6,7 @@ use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use enum_map::{Enum as _, EnumMap};
use enum_map::EnumMap;
use futures::Future;
use metrics::{
register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec,
@@ -104,7 +104,7 @@ pub(crate) static STORAGE_TIME_COUNT_PER_TIMELINE: Lazy<IntCounterVec> = Lazy::n
.expect("failed to define a metric")
});
// Buckets for background operation duration in seconds, like compaction, GC, size calculation.
// Buckets for background operations like compaction, GC, size calculation
const STORAGE_OP_BUCKETS: &[f64] = &[0.010, 0.100, 1.0, 10.0, 100.0, 1000.0];
pub(crate) static STORAGE_TIME_GLOBAL: Lazy<HistogramVec> = Lazy::new(|| {
@@ -236,7 +236,7 @@ pub(crate) static GET_VECTORED_LATENCY: Lazy<GetVectoredLatency> = Lazy::new(||
GetVectoredLatency {
map: EnumMap::from_array(std::array::from_fn(|task_kind_idx| {
let task_kind = TaskKind::from_usize(task_kind_idx);
let task_kind = <TaskKind as enum_map::Enum>::from_usize(task_kind_idx);
if GetVectoredLatency::TRACKED_TASK_KINDS.contains(&task_kind) {
let task_kind = task_kind.into();
@@ -259,7 +259,7 @@ pub(crate) static SCAN_LATENCY: Lazy<ScanLatency> = Lazy::new(|| {
ScanLatency {
map: EnumMap::from_array(std::array::from_fn(|task_kind_idx| {
let task_kind = TaskKind::from_usize(task_kind_idx);
let task_kind = <TaskKind as enum_map::Enum>::from_usize(task_kind_idx);
if ScanLatency::TRACKED_TASK_KINDS.contains(&task_kind) {
let task_kind = task_kind.into();
@@ -300,10 +300,10 @@ static PAGE_CACHE_READ_ACCESSES: Lazy<IntCounterVec> = Lazy::new(|| {
pub(crate) static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMetrics {
map: EnumMap::from_array(std::array::from_fn(|task_kind| {
let task_kind = TaskKind::from_usize(task_kind);
let task_kind = <TaskKind as enum_map::Enum>::from_usize(task_kind);
let task_kind: &'static str = task_kind.into();
EnumMap::from_array(std::array::from_fn(|content_kind| {
let content_kind = PageContentKind::from_usize(content_kind);
let content_kind = <PageContentKind as enum_map::Enum>::from_usize(content_kind);
let content_kind: &'static str = content_kind.into();
PageCacheMetricsForTaskKind {
read_accesses_immutable: {
@@ -1913,7 +1913,7 @@ pub(crate) static COMPUTE_COMMANDS_COUNTERS: Lazy<ComputeCommandCounters> = Lazy
ComputeCommandCounters {
map: EnumMap::from_array(std::array::from_fn(|i| {
let command = ComputeCommandKind::from_usize(i);
let command = <ComputeCommandKind as enum_map::Enum>::from_usize(i);
let command_str: &'static str = command.into();
inner.with_label_values(&[command_str])
})),
@@ -2213,13 +2213,11 @@ pub(crate) static TENANT_TASK_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
pub struct BackgroundLoopSemaphoreMetrics {
counters: EnumMap<BackgroundLoopKind, IntCounterPair>,
durations: EnumMap<BackgroundLoopKind, Histogram>,
waiting_tasks: EnumMap<BackgroundLoopKind, IntGauge>,
running_tasks: EnumMap<BackgroundLoopKind, IntGauge>,
durations: EnumMap<BackgroundLoopKind, Counter>,
}
pub(crate) static BACKGROUND_LOOP_SEMAPHORE: Lazy<BackgroundLoopSemaphoreMetrics> =
Lazy::new(|| {
pub(crate) static BACKGROUND_LOOP_SEMAPHORE: Lazy<BackgroundLoopSemaphoreMetrics> = Lazy::new(
|| {
let counters = register_int_counter_pair_vec!(
"pageserver_background_loop_semaphore_wait_start_count",
"Counter for background loop concurrency-limiting semaphore acquire calls started",
@@ -2229,101 +2227,45 @@ pub(crate) static BACKGROUND_LOOP_SEMAPHORE: Lazy<BackgroundLoopSemaphoreMetrics
)
.unwrap();
let durations = register_histogram_vec!(
"pageserver_background_loop_semaphore_wait_seconds",
"Seconds spent waiting on background loop semaphore acquisition",
&["task"],
vec![0.01, 1.0, 5.0, 10.0, 30.0, 60.0, 180.0, 300.0, 600.0],
)
.unwrap();
let waiting_tasks = register_int_gauge_vec!(
"pageserver_background_loop_semaphore_waiting_tasks",
"Number of background loop tasks waiting for semaphore",
&["task"],
)
.unwrap();
let running_tasks = register_int_gauge_vec!(
"pageserver_background_loop_semaphore_running_tasks",
"Number of background loop tasks running concurrently",
let durations = register_counter_vec!(
"pageserver_background_loop_semaphore_wait_duration_seconds",
"Sum of wall clock time spent waiting on the background loop concurrency-limiting semaphore acquire calls",
&["task"],
)
.unwrap();
BackgroundLoopSemaphoreMetrics {
counters: EnumMap::from_array(std::array::from_fn(|i| {
let kind = BackgroundLoopKind::from_usize(i);
counters: enum_map::EnumMap::from_array(std::array::from_fn(|i| {
let kind = <BackgroundLoopKind as enum_map::Enum>::from_usize(i);
counters.with_label_values(&[kind.into()])
})),
durations: EnumMap::from_array(std::array::from_fn(|i| {
let kind = BackgroundLoopKind::from_usize(i);
durations: enum_map::EnumMap::from_array(std::array::from_fn(|i| {
let kind = <BackgroundLoopKind as enum_map::Enum>::from_usize(i);
durations.with_label_values(&[kind.into()])
})),
waiting_tasks: EnumMap::from_array(std::array::from_fn(|i| {
let kind = BackgroundLoopKind::from_usize(i);
waiting_tasks.with_label_values(&[kind.into()])
})),
running_tasks: EnumMap::from_array(std::array::from_fn(|i| {
let kind = BackgroundLoopKind::from_usize(i);
running_tasks.with_label_values(&[kind.into()])
})),
}
});
},
);
impl BackgroundLoopSemaphoreMetrics {
/// Starts recording semaphore metrics. Call `acquired()` on the returned recorder when the
/// semaphore is acquired, and drop it when the task completes or is cancelled.
pub(crate) fn record(
&self,
task: BackgroundLoopKind,
) -> BackgroundLoopSemaphoreMetricsRecorder {
BackgroundLoopSemaphoreMetricsRecorder::start(self, task)
}
}
/// Records metrics for a background task.
pub struct BackgroundLoopSemaphoreMetricsRecorder<'a> {
metrics: &'a BackgroundLoopSemaphoreMetrics,
task: BackgroundLoopKind,
start: Instant,
wait_counter_guard: Option<metrics::IntCounterPairGuard>,
}
impl<'a> BackgroundLoopSemaphoreMetricsRecorder<'a> {
/// Starts recording semaphore metrics, by recording wait time and incrementing
/// `wait_start_count` and `waiting_tasks`.
fn start(metrics: &'a BackgroundLoopSemaphoreMetrics, task: BackgroundLoopKind) -> Self {
metrics.waiting_tasks[task].inc();
Self {
metrics,
task,
start: Instant::now(),
wait_counter_guard: Some(metrics.counters[task].guard()),
pub(crate) fn measure_acquisition(&self, task: BackgroundLoopKind) -> impl Drop + '_ {
struct Record<'a> {
metrics: &'a BackgroundLoopSemaphoreMetrics,
task: BackgroundLoopKind,
_counter_guard: metrics::IntCounterPairGuard,
start: Instant,
}
}
/// Signals that the semaphore has been acquired, and updates relevant metrics.
pub fn acquired(&mut self) -> Duration {
let waited = self.start.elapsed();
self.wait_counter_guard.take().expect("already acquired");
self.metrics.durations[self.task].observe(waited.as_secs_f64());
self.metrics.waiting_tasks[self.task].dec();
self.metrics.running_tasks[self.task].inc();
waited
}
}
impl Drop for BackgroundLoopSemaphoreMetricsRecorder<'_> {
/// The task either completed or was cancelled.
fn drop(&mut self) {
if self.wait_counter_guard.take().is_some() {
// Waiting.
self.metrics.durations[self.task].observe(self.start.elapsed().as_secs_f64());
self.metrics.waiting_tasks[self.task].dec();
} else {
// Running.
self.metrics.running_tasks[self.task].dec();
impl Drop for Record<'_> {
fn drop(&mut self) {
let elapsed = self.start.elapsed().as_secs_f64();
self.metrics.durations[self.task].inc_by(elapsed);
}
}
Record {
metrics: self,
task,
_counter_guard: self.counters[task].guard(),
start: Instant::now(),
}
}
}
@@ -2572,7 +2514,7 @@ pub(crate) static WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM: Lazy<Histogram> =
pub(crate) struct WalRedoProcessCounters {
pub(crate) started: IntCounter,
pub(crate) killed_by_cause: EnumMap<WalRedoKillCause, IntCounter>,
pub(crate) killed_by_cause: enum_map::EnumMap<WalRedoKillCause, IntCounter>,
pub(crate) active_stderr_logger_tasks_started: IntCounter,
pub(crate) active_stderr_logger_tasks_finished: IntCounter,
}
@@ -2614,7 +2556,7 @@ impl Default for WalRedoProcessCounters {
Self {
started,
killed_by_cause: EnumMap::from_array(std::array::from_fn(|i| {
let cause = WalRedoKillCause::from_usize(i);
let cause = <WalRedoKillCause as enum_map::Enum>::from_usize(i);
let cause_str: &'static str = cause.into();
killed.with_label_values(&[cause_str])
})),

View File

@@ -489,6 +489,7 @@ impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrappe
let timeline = tenant_shard
.get_timeline(timeline_id, true)
.map_err(GetActiveTimelineError::Timeline)?;
set_tracing_field_shard_id(&timeline);
Ok(timeline)
}
}
@@ -773,11 +774,11 @@ impl PageServerHandler {
let batched_msg = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn);
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
let span = tracing::info_span!(parent: &parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetRelExists,
@@ -792,10 +793,11 @@ impl PageServerHandler {
}
}
PagestreamFeMessage::Nblocks(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn);
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let span = tracing::info_span!(parent: &parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetRelSize,
@@ -810,10 +812,11 @@ impl PageServerHandler {
}
}
PagestreamFeMessage::DbSize(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn);
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let span = tracing::info_span!(parent: &parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetDbSize,
@@ -828,10 +831,11 @@ impl PageServerHandler {
}
}
PagestreamFeMessage::GetSlruSegment(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn);
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let span = tracing::info_span!(parent: &parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetSlruSegment,
@@ -846,20 +850,12 @@ impl PageServerHandler {
}
}
PagestreamFeMessage::GetPage(req) => {
// avoid a somewhat costly Span::record() by constructing the entire span in one go.
macro_rules! mkspan {
(before shard routing) => {{
tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn)
}};
($shard_id:expr) => {{
tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn, shard_id = %$shard_id)
}};
}
let span = tracing::info_span!(parent: parent_span, "handle_get_page_at_lsn_request_batched", req_lsn = %req.hdr.request_lsn);
macro_rules! respond_error {
($span:expr, $error:expr) => {{
($error:expr) => {{
let error = BatchedFeMessage::RespondError {
span: $span,
span,
error: BatchedPageStreamError {
req: req.hdr,
err: $error,
@@ -872,35 +868,27 @@ impl PageServerHandler {
let key = rel_block_to_key(req.rel, req.blkno);
let shard = match timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Page(key))
.instrument(span.clone()) // sets `shard_id` field
.await
{
Ok(tl) => tl,
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
return respond_error!(PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into()
));
}
Err(e) => {
let span = mkspan!(before shard routing);
match e {
GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_)) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
return respond_error!(
span,
PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into()
)
);
}
e => {
return respond_error!(span, e.into());
}
}
return respond_error!(e.into());
}
};
let span = mkspan!(shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
&shard,
@@ -922,7 +910,7 @@ impl PageServerHandler {
{
Ok(lsn) => lsn,
Err(e) => {
return respond_error!(span, e);
return respond_error!(e);
}
};
BatchedFeMessage::GetPage {
@@ -934,10 +922,11 @@ impl PageServerHandler {
}
#[cfg(feature = "testing")]
PagestreamFeMessage::Test(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_test_request");
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let span = tracing::info_span!(parent: &parent_span, "handle_test_request", shard_id = %shard.tenant_shard_id.shard_slug());
let timer =
record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at)
.await?;
@@ -1201,29 +1190,6 @@ impl PageServerHandler {
}
};
// We purposefully don't count flush time into the smgr operaiton timer.
//
// The reason is that current compute client will not perform protocol processing
// if the postgres backend process is doing things other than `->smgr_read()`.
// This is especially the case for prefetch.
//
// If the compute doesn't read from the connection, eventually TCP will backpressure
// all the way into our flush call below.
//
// The timer's underlying metric is used for a storage-internal latency SLO and
// we don't want to include latency in it that we can't control.
// And as pointed out above, in this case, we don't control the time that flush will take.
//
// We put each response in the batch onto the wire in a separate pgb_writer.flush()
// call, which (all unmeasured) adds syscall overhead but reduces time to first byte
// and avoids building up a "giant" contiguous userspace buffer to hold the entire response.
// TODO: vectored socket IO would be great, but pgb_writer doesn't support that.
//
// Since we're flushing multiple times in the loop, but only have access to the per-op
// timers inside the loop, we capture the flush start time here and reuse it to finish
// each op timer.
let flushing_start_time = Instant::now();
// Map handler result to protocol behavior.
// Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
@@ -1272,9 +1238,21 @@ impl PageServerHandler {
&response_msg.serialize(protocol_version),
))?;
// We purposefully don't count flush time into the timer.
//
// The reason is that current compute client will not perform protocol processing
// if the postgres backend process is doing things other than `->smgr_read()`.
// This is especially the case for prefetch.
//
// If the compute doesn't read from the connection, eventually TCP will backpressure
// all the way into our flush call below.
//
// The timer's underlying metric is used for a storage-internal latency SLO and
// we don't want to include latency in it that we can't control.
// And as pointed out above, in this case, we don't control the time that flush will take.
let flushing_timer = timer.map(|mut timer| {
timer
.observe_execution_end_flush_start(flushing_start_time)
.observe_execution_end_flush_start(Instant::now())
.expect("we are the first caller")
});
@@ -1302,6 +1280,8 @@ impl PageServerHandler {
}
Ok(())
}
// and log the info! line inside the request span
.instrument(span.clone())
.await?;
}
Ok(())
@@ -1362,7 +1342,7 @@ impl PageServerHandler {
.take()
.expect("implementation error: timeline_handles should not be locked");
let request_span = info_span!("request");
let request_span = info_span!("request", shard_id = tracing::field::Empty);
let ((pgb_reader, timeline_handles), result) = match self.pipelining_config.clone() {
PageServicePipeliningConfig::Pipelined(pipelining_config) => {
self.handle_pagerequests_pipelined(
@@ -1712,7 +1692,7 @@ impl PageServerHandler {
// to distinguish a misbehaving client (asking for old LSN) from a storage issue (data missing at a legitimate LSN).
if request_lsn < **latest_gc_cutoff_lsn && !timeline.is_gc_blocked_by_lsn_lease_deadline() {
let gc_info = &timeline.gc_info.read().unwrap();
if !gc_info.lsn_covered_by_lease(request_lsn) {
if !gc_info.leases.contains_key(&request_lsn) {
return Err(
PageStreamError::BadRequest(format!(
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
@@ -2056,13 +2036,6 @@ impl PageServerHandler {
.unwrap()
.get(tenant_id, timeline_id, ShardSelector::Zero)
.await?;
set_tracing_field_shard_id(&timeline);
if timeline.is_archived() == Some(true) {
// TODO after a grace period, turn this log line into a hard error
tracing::warn!("timeline {tenant_id}/{timeline_id} is archived, but got basebackup request for it.");
//return Err(QueryError::NotFound("timeline is archived".into()))
}
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
if let Some(lsn) = lsn {

View File

@@ -612,18 +612,11 @@ impl Timeline {
pausable_failpoint!("find-lsn-for-timestamp-pausable");
let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
let gc_cutoff_planned = {
let gc_info = self.gc_info.read().unwrap();
gc_info.min_cutoff()
};
// Usually the planned cutoff is newer than the cutoff of the last gc run,
// but let's be defensive.
let gc_cutoff = gc_cutoff_planned.max(*gc_cutoff_lsn_guard);
// We use this method to figure out the branching LSN for the new branch, but the
// GC cutoff could be before the branching point and we cannot create a new branch
// with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
// on the safe side.
let min_lsn = std::cmp::max(gc_cutoff, self.get_ancestor_lsn());
let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
let max_lsn = self.get_last_record_lsn();
// LSNs are always 8-byte aligned. low/mid/high represent the

View File

@@ -328,8 +328,8 @@ pub enum TaskKind {
// Eviction. One per timeline.
Eviction,
// Tenant housekeeping (flush idle ephemeral layers, shut down idle walredo, etc.).
TenantHousekeeping,
// Ingest housekeeping (flushing ephemeral layers on time threshold or disk pressure)
IngestHousekeeping,
/// See [`crate::disk_usage_eviction_task`].
DiskUsageEviction,

View File

@@ -20,7 +20,6 @@ use chrono::NaiveDateTime;
use enumset::EnumSet;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use itertools::Itertools as _;
use pageserver_api::models;
use pageserver_api::models::CompactInfoResponse;
use pageserver_api::models::LsnLease;
@@ -3089,28 +3088,32 @@ impl Tenant {
Ok(rx)
}
/// Performs periodic housekeeping, via the tenant housekeeping background task.
async fn housekeeping(&self) {
// Call through to all timelines to freeze ephemeral layers as needed. This usually happens
// during ingest, but we don't want idle timelines to hold open layers for too long.
let timelines = self
.timelines
.lock()
.unwrap()
.values()
.filter(|tli| tli.is_active())
.cloned()
.collect_vec();
// Call through to all timelines to freeze ephemeral layers if needed. Usually
// this happens during ingest: this background housekeeping is for freezing layers
// that are open but haven't been written to for some time.
async fn ingest_housekeeping(&self) {
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// compactions. We don't want to block everything else while the
// compaction runs.
let timelines = {
self.timelines
.lock()
.unwrap()
.values()
.filter_map(|timeline| {
if timeline.is_active() {
Some(timeline.clone())
} else {
None
}
})
.collect::<Vec<_>>()
};
for timeline in timelines {
for timeline in &timelines {
timeline.maybe_freeze_ephemeral_layer().await;
}
// Shut down walredo if idle.
const WALREDO_IDLE_TIMEOUT: Duration = Duration::from_secs(180);
if let Some(ref walredo_mgr) = self.walredo_mgr {
walredo_mgr.maybe_quiesce(WALREDO_IDLE_TIMEOUT);
}
}
pub fn timeline_has_no_attached_children(&self, timeline_id: TimelineId) -> bool {
@@ -4639,26 +4642,22 @@ impl Tenant {
// check against last actual 'latest_gc_cutoff' first
let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn();
src_timeline
.check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn)
.context(format!(
"invalid branch start lsn: less than latest GC cutoff {}",
*latest_gc_cutoff_lsn,
))
.map_err(CreateTimelineError::AncestorLsn)?;
// and then the planned GC cutoff
{
let gc_info = src_timeline.gc_info.read().unwrap();
let planned_cutoff = gc_info.min_cutoff();
if gc_info.lsn_covered_by_lease(start_lsn) {
tracing::info!("skipping comparison of {start_lsn} with gc cutoff {} and planned gc cutoff {planned_cutoff} due to lsn lease", *latest_gc_cutoff_lsn);
} else {
src_timeline
.check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn)
.context(format!(
"invalid branch start lsn: less than latest GC cutoff {}",
*latest_gc_cutoff_lsn,
))
.map_err(CreateTimelineError::AncestorLsn)?;
// and then the planned GC cutoff
if start_lsn < planned_cutoff {
return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
"invalid branch start lsn: less than planned GC cutoff {planned_cutoff}"
)));
}
let cutoff = gc_info.min_cutoff();
if start_lsn < cutoff {
return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
"invalid branch start lsn: less than planned GC cutoff {cutoff}"
)));
}
}

View File

@@ -437,7 +437,8 @@ impl RemoteTimelineClient {
.conf
.remote_storage_config
.as_ref()
.map_or(0, |r| r.concurrency_limit());
.and_then(|r| r.concurrency_limit())
.unwrap_or(0);
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;
self.update_remote_physical_size_gauge(Some(index_part));
@@ -460,7 +461,8 @@ impl RemoteTimelineClient {
.conf
.remote_storage_config
.as_ref()
.map_or(0, |r| r.concurrency_limit());
.and_then(|r| r.concurrency_limit())
.unwrap_or(0);
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_empty_remote(local_metadata, inprogress_limit)?;
self.update_remote_physical_size_gauge(None);
@@ -482,7 +484,8 @@ impl RemoteTimelineClient {
.conf
.remote_storage_config
.as_ref()
.map_or(0, |r| r.concurrency_limit());
.and_then(|r| r.concurrency_limit())
.unwrap_or(0);
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;
@@ -517,7 +520,7 @@ impl RemoteTimelineClient {
if let Ok(queue) = queue_locked.initialized_mut() {
let blocked_deletions = std::mem::take(&mut queue.blocked_deletions);
for d in blocked_deletions {
if let Err(e) = self.deletion_queue_client.push_layers(
if let Err(e) = self.deletion_queue_client.push_layers_sync(
self.tenant_shard_id,
self.timeline_id,
self.generation,
@@ -2151,6 +2154,7 @@ impl RemoteTimelineClient {
self.generation,
delete.layers.clone(),
)
.await
.map_err(|e| anyhow::anyhow!(e))
}
}

View File

@@ -1,80 +1,53 @@
//! This module contains per-tenant background processes, e.g. compaction and GC.
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
use std::cmp::max;
use std::future::Future;
use std::ops::{ControlFlow, RangeInclusive};
use std::pin::pin;
use std::sync::{Arc, Mutex};
use std::ops::ControlFlow;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use once_cell::sync::Lazy;
use rand::Rng;
use scopeguard::defer;
use tokio::sync::{Semaphore, SemaphorePermit};
use tokio_util::sync::CancellationToken;
use tracing::*;
use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::{BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS};
use crate::metrics::TENANT_TASK_EVENTS;
use crate::task_mgr;
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::throttle::Stats;
use crate::tenant::timeline::compaction::CompactionOutcome;
use crate::tenant::timeline::CompactionError;
use crate::tenant::{Tenant, TenantState};
use pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD;
use utils::completion::Barrier;
use utils::rate_limit::RateLimit;
use utils::{backoff, pausable_failpoint};
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{backoff, completion, pausable_failpoint};
/// Semaphore limiting concurrent background tasks (across all tenants).
///
/// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work.
static CONCURRENT_BACKGROUND_TASKS: Lazy<Semaphore> = Lazy::new(|| {
let total_threads = TOKIO_WORKER_THREADS.get();
let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
assert_ne!(permits, 0, "we will not be adding in permits later");
assert!(permits < total_threads, "need threads for other work");
Semaphore::new(permits)
});
static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
once_cell::sync::Lazy::new(|| {
let total_threads = task_mgr::TOKIO_WORKER_THREADS.get();
let permits = usize::max(
1,
// while a lot of the work is done on spawn_blocking, we still do
// repartitioning in the async context. this should give leave us some workers
// unblocked to be blocked on other work, hopefully easing any outside visible
// effects of restarts.
//
// 6/8 is a guess; previously we ran with unlimited 8 and more from
// spawn_blocking.
(total_threads * 3).checked_div(4).unwrap_or(0),
);
assert_ne!(permits, 0, "we will not be adding in permits later");
assert!(
permits < total_threads,
"need threads avail for shorter work"
);
tokio::sync::Semaphore::new(permits)
});
/// Semaphore limiting concurrent compaction tasks (across all tenants). This is disabled by
/// default, see `use_compaction_semaphore`.
///
/// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work.
///
/// This is a separate semaphore from background tasks, because L0 compaction needs to be responsive
/// to avoid high read amp during heavy write workloads.
///
/// TODO: split image compaction and L0 compaction, and move image compaction to background tasks.
/// Only L0 compaction needs to be responsive, and it shouldn't block on image compaction.
static CONCURRENT_COMPACTION_TASKS: Lazy<Semaphore> = Lazy::new(|| {
let total_threads = TOKIO_WORKER_THREADS.get();
let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
assert_ne!(permits, 0, "we will not be adding in permits later");
assert!(permits < total_threads, "need threads for other work");
Semaphore::new(permits)
});
/// Background jobs.
///
/// NB: not all of these acquire a CONCURRENT_BACKGROUND_TASKS semaphore permit, only the ones that
/// do any significant IO.
#[derive(
Debug,
PartialEq,
Eq,
Clone,
Copy,
strum_macros::IntoStaticStr,
strum_macros::Display,
enum_map::Enum,
)]
#[derive(Debug, PartialEq, Eq, Clone, Copy, strum_macros::IntoStaticStr, enum_map::Enum)]
#[strum(serialize_all = "snake_case")]
pub(crate) enum BackgroundLoopKind {
Compaction,
Gc,
Eviction,
TenantHouseKeeping,
IngestHouseKeeping,
ConsumptionMetricsCollectMetrics,
ConsumptionMetricsSyntheticSizeWorker,
InitialLogicalSizeCalculation,
@@ -82,56 +55,36 @@ pub(crate) enum BackgroundLoopKind {
SecondaryDownload,
}
pub struct BackgroundLoopSemaphorePermit<'a> {
_permit: SemaphorePermit<'static>,
_recorder: BackgroundLoopSemaphoreMetricsRecorder<'a>,
impl BackgroundLoopKind {
fn as_static_str(&self) -> &'static str {
self.into()
}
}
/// Acquires a semaphore permit, to limit concurrent background jobs.
pub(crate) async fn acquire_concurrency_permit(
/// Cancellation safe.
pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
loop_kind: BackgroundLoopKind,
use_compaction_semaphore: bool,
_ctx: &RequestContext,
) -> BackgroundLoopSemaphorePermit<'static> {
// TODO: use a lower threshold and remove the pacer once we resolve some blockage.
const WARN_THRESHOLD: Duration = Duration::from_secs(600);
static WARN_PACER: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut recorder = crate::metrics::BACKGROUND_LOOP_SEMAPHORE.record(loop_kind);
) -> tokio::sync::SemaphorePermit<'static> {
let _guard = crate::metrics::BACKGROUND_LOOP_SEMAPHORE.measure_acquisition(loop_kind);
if loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation {
pausable_failpoint!("initial-size-calculation-permit-pause");
}
// TODO: assert that we run on BACKGROUND_RUNTIME; requires tokio_unstable Handle::id();
let permit = if loop_kind == BackgroundLoopKind::Compaction && use_compaction_semaphore {
CONCURRENT_COMPACTION_TASKS.acquire().await
} else {
assert!(!use_compaction_semaphore);
CONCURRENT_BACKGROUND_TASKS.acquire().await
}
.expect("should never close");
let waited = recorder.acquired();
if waited >= WARN_THRESHOLD {
let waited = waited.as_secs_f64();
WARN_PACER
.lock()
.unwrap()
.call(|| warn!("{loop_kind} task waited {waited:.3}s for semaphore permit"));
}
BackgroundLoopSemaphorePermit {
_permit: permit,
_recorder: recorder,
match CONCURRENT_BACKGROUND_TASKS.acquire().await {
Ok(permit) => permit,
Err(_closed) => unreachable!("we never close the semaphore"),
}
}
/// Start per tenant background loops: compaction, GC, and ingest housekeeping.
pub fn start_background_loops(tenant: &Arc<Tenant>, can_start: Option<&Barrier>) {
/// Start per tenant background loops: compaction and gc.
pub fn start_background_loops(
tenant: &Arc<Tenant>,
background_jobs_can_start: Option<&completion::Barrier>,
) {
let tenant_shard_id = tenant.tenant_shard_id;
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::Compaction,
@@ -140,15 +93,13 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, can_start: Option<&Barrier>)
&format!("compactor for tenant {tenant_shard_id}"),
{
let tenant = Arc::clone(tenant);
let can_start = can_start.cloned();
let background_jobs_can_start = background_jobs_can_start.cloned();
async move {
let cancel = task_mgr::shutdown_token(); // NB: must be in async context
let cancel = task_mgr::shutdown_token();
tokio::select! {
_ = cancel.cancelled() => return Ok(()),
_ = Barrier::maybe_wait(can_start) => {}
_ = cancel.cancelled() => { return Ok(()) },
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
};
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
compaction_loop(tenant, cancel)
// If you rename this span, change the RUST_LOG env variable in test_runner/performance/test_branch_creation.py
.instrument(info_span!("compaction_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
@@ -157,7 +108,6 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, can_start: Option<&Barrier>)
}
},
);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::GarbageCollector,
@@ -166,15 +116,13 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, can_start: Option<&Barrier>)
&format!("garbage collector for tenant {tenant_shard_id}"),
{
let tenant = Arc::clone(tenant);
let can_start = can_start.cloned();
let background_jobs_can_start = background_jobs_can_start.cloned();
async move {
let cancel = task_mgr::shutdown_token(); // NB: must be in async context
let cancel = task_mgr::shutdown_token();
tokio::select! {
_ = cancel.cancelled() => return Ok(()),
_ = Barrier::maybe_wait(can_start) => {}
_ = cancel.cancelled() => { return Ok(()) },
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
};
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
gc_loop(tenant, cancel)
.instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
.await;
@@ -185,23 +133,21 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, can_start: Option<&Barrier>)
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::TenantHousekeeping,
TaskKind::IngestHousekeeping,
tenant_shard_id,
None,
&format!("housekeeping for tenant {tenant_shard_id}"),
&format!("ingest housekeeping for tenant {tenant_shard_id}"),
{
let tenant = Arc::clone(tenant);
let can_start = can_start.cloned();
let background_jobs_can_start = background_jobs_can_start.cloned();
async move {
let cancel = task_mgr::shutdown_token(); // NB: must be in async context
let cancel = task_mgr::shutdown_token();
tokio::select! {
_ = cancel.cancelled() => return Ok(()),
_ = Barrier::maybe_wait(can_start) => {}
_ = cancel.cancelled() => { return Ok(()) },
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
};
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
tenant_housekeeping_loop(tenant, cancel)
.instrument(info_span!("tenant_housekeeping_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
ingest_housekeeping_loop(tenant, cancel)
.instrument(info_span!("ingest_housekeeping_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
.await;
Ok(())
}
@@ -209,293 +155,372 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, can_start: Option<&Barrier>)
);
}
/// Compaction task's main loop.
///
/// Compaction task's main loop
///
async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
const MAX_BACKOFF_SECS: f64 = 300.0;
// How many errors we have seen consequtively
let mut error_run_count = 0;
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
let mut first = true;
let mut error_run = 0; // consecutive errors
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
let mut first = true;
loop {
tokio::select! {
_ = cancel.cancelled() => {
return;
},
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(()) => (),
},
}
loop {
if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
return;
}
let period = tenant.get_compaction_period();
let period = tenant.get_compaction_period();
// TODO: we shouldn't need to await to find tenant and this could be moved outside of
// loop, #3501. There are also additional "allowed_errors" in tests.
if first {
first = false;
if random_init_delay(period, &cancel).await.is_err() {
break;
}
}
// TODO: we shouldn't need to await to find tenant and this could be moved outside of
// loop, #3501. There are also additional "allowed_errors" in tests.
if first {
first = false;
if sleep_random(period, &cancel).await.is_err() {
let sleep_duration;
if period == Duration::ZERO {
#[cfg(not(feature = "testing"))]
info!("automatic compaction is disabled");
// check again in 10 seconds, in case it's been enabled again.
sleep_duration = Duration::from_secs(10)
} else {
let iteration = Iteration {
started_at: Instant::now(),
period,
kind: BackgroundLoopKind::Compaction,
};
// Run compaction
let IterationResult { output, elapsed } = iteration
.run(tenant.compaction_iteration(&cancel, &ctx))
.await;
match output {
Ok(outcome) => {
error_run_count = 0;
// schedule the next compaction immediately in case there is a pending compaction task
sleep_duration = if let CompactionOutcome::Pending = outcome {
Duration::ZERO
} else {
period
};
}
Err(e) => {
let wait_duration = backoff::exponential_backoff_duration_seconds(
error_run_count + 1,
1.0,
MAX_BACKOFF_SECS,
);
error_run_count += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
log_compaction_error(
&e,
error_run_count,
&wait_duration,
cancel.is_cancelled(),
);
sleep_duration = wait_duration;
}
}
// the duration is recorded by performance tests by enabling debug in this function
tracing::debug!(
elapsed_ms = elapsed.as_millis(),
"compaction iteration complete"
);
};
// Perhaps we did no work and the walredo process has been idle for some time:
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
// TODO: move this to a separate task (housekeeping loop) that isn't affected by the back-off,
// so we get some upper bound guarantee on when walredo quiesce / this throttling reporting here happens.
if let Some(walredo_mgr) = &tenant.walredo_mgr {
walredo_mgr.maybe_quiesce(period * 10);
}
// Sleep
if tokio::time::timeout(sleep_duration, cancel.cancelled())
.await
.is_ok()
{
break;
}
}
let sleep_duration;
if period == Duration::ZERO {
#[cfg(not(feature = "testing"))]
info!("automatic compaction is disabled");
// check again in 10 seconds, in case it's been enabled again.
sleep_duration = Duration::from_secs(10)
} else {
let iteration = Iteration {
started_at: Instant::now(),
period,
kind: BackgroundLoopKind::Compaction,
};
// Run compaction
let IterationResult { output, elapsed } = iteration
.run(tenant.compaction_iteration(&cancel, &ctx))
.await;
match output {
Ok(outcome) => {
error_run = 0;
// schedule the next compaction immediately in case there is a pending compaction task
sleep_duration = if let CompactionOutcome::Pending = outcome {
Duration::from_secs(1)
} else {
period
};
}
Err(err) => {
let wait_duration = backoff::exponential_backoff_duration_seconds(
error_run + 1,
1.0,
MAX_BACKOFF_SECS,
);
error_run += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
log_compaction_error(&err, error_run, &wait_duration, cancel.is_cancelled());
sleep_duration = wait_duration;
}
}
// the duration is recorded by performance tests by enabling debug in this function
debug!(
elapsed_ms = elapsed.as_millis(),
"compaction iteration complete"
);
};
// Sleep
if tokio::time::timeout(sleep_duration, cancel.cancelled())
.await
.is_ok()
{
break;
}
}
.await;
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
}
fn log_compaction_error(
err: &CompactionError,
error_count: u32,
sleep_duration: &Duration,
e: &CompactionError,
error_run_count: u32,
sleep_duration: &std::time::Duration,
task_cancelled: bool,
) {
use crate::tenant::upload_queue::NotInitialized;
use crate::tenant::PageReconstructError;
use CompactionError::*;
let level = match err {
ShuttingDown => return,
Offload(_) => Level::ERROR,
_ if task_cancelled => Level::INFO,
Other(err) => {
let root_cause = err.root_cause();
enum LooksLike {
Info,
Error,
}
let upload_queue = root_cause
.downcast_ref::<NotInitialized>()
.is_some_and(|e| e.is_stopping());
let timeline = root_cause
.downcast_ref::<PageReconstructError>()
.is_some_and(|e| e.is_stopping());
let is_stopping = upload_queue || timeline;
let decision = match e {
ShuttingDown => None,
Offload(_) => Some(LooksLike::Error),
_ if task_cancelled => Some(LooksLike::Info),
Other(e) => {
let root_cause = e.root_cause();
let is_stopping = {
let upload_queue = root_cause
.downcast_ref::<NotInitialized>()
.is_some_and(|e| e.is_stopping());
let timeline = root_cause
.downcast_ref::<PageReconstructError>()
.is_some_and(|e| e.is_stopping());
upload_queue || timeline
};
if is_stopping {
Level::INFO
Some(LooksLike::Info)
} else {
Level::ERROR
Some(LooksLike::Error)
}
}
};
match level {
Level::ERROR => {
error!("Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}")
}
Level::INFO => {
info!("Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}")
}
level => unimplemented!("unexpected level {level:?}"),
match decision {
Some(LooksLike::Info) => info!(
"Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:#}",
),
Some(LooksLike::Error) => error!(
"Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:?}",
),
None => {}
}
}
/// GC task's main loop.
///
/// GC task's main loop
///
async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
const MAX_BACKOFF_SECS: f64 = 300.0;
let mut error_run = 0; // consecutive errors
// How many errors we have seen consequtively
let mut error_run_count = 0;
// GC might require downloading, to find the cutoff LSN that corresponds to the
// cutoff specified as time.
let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let mut first = true;
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
// GC might require downloading, to find the cutoff LSN that corresponds to the
// cutoff specified as time.
let ctx =
RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
loop {
if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
return;
}
let mut first = true;
loop {
tokio::select! {
_ = cancel.cancelled() => {
return;
},
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(()) => (),
},
}
let period = tenant.get_gc_period();
let period = tenant.get_gc_period();
if first {
first = false;
if sleep_random(period, &cancel).await.is_err() {
if first {
first = false;
let delays = async {
random_init_delay(period, &cancel).await?;
Ok::<_, Cancelled>(())
};
if delays.await.is_err() {
break;
}
}
let gc_horizon = tenant.get_gc_horizon();
let sleep_duration;
if period == Duration::ZERO || gc_horizon == 0 {
#[cfg(not(feature = "testing"))]
info!("automatic GC is disabled");
// check again in 10 seconds, in case it's been enabled again.
sleep_duration = Duration::from_secs(10);
} else {
let iteration = Iteration {
started_at: Instant::now(),
period,
kind: BackgroundLoopKind::Gc,
};
// Run gc
let IterationResult { output, elapsed: _ } =
iteration.run(tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &cancel, &ctx))
.await;
match output {
Ok(_) => {
error_run_count = 0;
sleep_duration = period;
}
Err(crate::tenant::GcError::TenantCancelled) => {
return;
}
Err(e) => {
let wait_duration = backoff::exponential_backoff_duration_seconds(
error_run_count + 1,
1.0,
MAX_BACKOFF_SECS,
);
error_run_count += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
if matches!(e, crate::tenant::GcError::TimelineCancelled) {
// Timeline was cancelled during gc. We might either be in an event
// that affects the entire tenant (tenant deletion, pageserver shutdown),
// or in one that affects the timeline only (timeline deletion).
// Therefore, don't exit the loop.
info!("Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}");
} else {
error!("Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}");
}
sleep_duration = wait_duration;
}
}
};
if tokio::time::timeout(sleep_duration, cancel.cancelled())
.await
.is_ok()
{
break;
}
}
}
.await;
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
}
async fn ingest_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let mut last_throttle_flag_reset_at = Instant::now();
loop {
tokio::select! {
_ = cancel.cancelled() => {
return;
},
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(()) => (),
},
}
// We run ingest housekeeping with the same frequency as compaction: it is not worth
// having a distinct setting. But we don't run it in the same task, because compaction
// blocks on acquiring the background job semaphore.
let period = tenant.get_compaction_period();
// If compaction period is set to zero (to disable it), then we will use a reasonable default
let period = if period == Duration::ZERO {
humantime::Duration::from_str(
pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD,
)
.unwrap()
.into()
} else {
period
};
// Jitter the period by +/- 5%
let period =
rand::thread_rng().gen_range((period * (95)) / 100..(period * (105)) / 100);
// Always sleep first: we do not need to do ingest housekeeping early in the lifetime of
// a tenant, since it won't have started writing any ephemeral files yet.
if tokio::time::timeout(period, cancel.cancelled())
.await
.is_ok()
{
break;
}
let gc_horizon = tenant.get_gc_horizon();
let sleep_duration;
if period == Duration::ZERO || gc_horizon == 0 {
#[cfg(not(feature = "testing"))]
info!("automatic GC is disabled");
// check again in 10 seconds, in case it's been enabled again.
sleep_duration = Duration::from_secs(10);
} else {
let iteration = Iteration {
started_at: Instant::now(),
period,
kind: BackgroundLoopKind::Gc,
kind: BackgroundLoopKind::IngestHouseKeeping,
};
// Run gc
let IterationResult { output, elapsed: _ } = iteration
.run(tenant.gc_iteration(
None,
gc_horizon,
tenant.get_pitr_interval(),
&cancel,
&ctx,
))
.await;
match output {
Ok(_) => {
error_run = 0;
sleep_duration = period;
}
Err(crate::tenant::GcError::TenantCancelled) => {
iteration.run(tenant.ingest_housekeeping()).await;
// TODO: rename the background loop kind to something more generic, like, tenant housekeeping.
// Or just spawn another background loop for this throttle, it's not like it's super costly.
info_span!(parent: None, "pagestream_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
let now = Instant::now();
let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
let Stats { count_accounted_start, count_accounted_finish, count_throttled, sum_throttled_usecs} = tenant.pagestream_throttle.reset_stats();
if count_throttled == 0 {
return;
}
Err(e) => {
let wait_duration = backoff::exponential_backoff_duration_seconds(
error_run + 1,
1.0,
MAX_BACKOFF_SECS,
);
error_run += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
let allowed_rps = tenant.pagestream_throttle.steady_rps();
let delta = now - prev;
info!(
n_seconds=%format_args!("{:.3}", delta.as_secs_f64()),
count_accounted = count_accounted_finish, // don't break existing log scraping
count_throttled,
sum_throttled_usecs,
count_accounted_start, // log after pre-existing fields to not break existing log scraping
allowed_rps=%format_args!("{allowed_rps:.0}"),
"shard was throttled in the last n_seconds"
);
});
}
}
.await;
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
}
if matches!(e, crate::tenant::GcError::TimelineCancelled) {
// Timeline was cancelled during gc. We might either be in an event
// that affects the entire tenant (tenant deletion, pageserver shutdown),
// or in one that affects the timeline only (timeline deletion).
// Therefore, don't exit the loop.
info!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
} else {
error!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
// if the tenant has a proper status already, no need to wait for anything
if tenant.current_state() == TenantState::Active {
ControlFlow::Continue(())
} else {
let mut tenant_state_updates = tenant.subscribe_for_state_updates();
loop {
match tenant_state_updates.changed().await {
Ok(()) => {
let new_state = &*tenant_state_updates.borrow();
match new_state {
TenantState::Active => {
debug!("Tenant state changed to active, continuing the task loop");
return ControlFlow::Continue(());
}
state => {
debug!("Not running the task loop, tenant is not active: {state:?}");
continue;
}
}
sleep_duration = wait_duration;
}
Err(_sender_dropped_error) => {
return ControlFlow::Break(());
}
}
};
if tokio::time::timeout(sleep_duration, cancel.cancelled())
.await
.is_ok()
{
break;
}
}
}
/// Tenant housekeeping's main loop.
async fn tenant_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
let mut last_throttle_flag_reset_at = Instant::now();
loop {
if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
return;
}
// Use the same period as compaction; it's not worth a separate setting. But if it's set to
// zero (to disable compaction), then use a reasonable default. Jitter it by 5%.
let period = match tenant.get_compaction_period() {
Duration::ZERO => humantime::parse_duration(DEFAULT_COMPACTION_PERIOD).unwrap(),
period => period,
};
let Ok(period) = sleep_jitter(period, period * 5 / 100, &cancel).await else {
break;
};
// Do tenant housekeeping.
let iteration = Iteration {
started_at: Instant::now(),
period,
kind: BackgroundLoopKind::TenantHouseKeeping,
};
iteration.run(tenant.housekeeping()).await;
// Log any getpage throttling.
info_span!(parent: None, "pagestream_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
let now = Instant::now();
let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
let Stats { count_accounted_start, count_accounted_finish, count_throttled, sum_throttled_usecs} = tenant.pagestream_throttle.reset_stats();
if count_throttled == 0 {
return;
}
let allowed_rps = tenant.pagestream_throttle.steady_rps();
let delta = now - prev;
info!(
n_seconds=%format_args!("{:.3}", delta.as_secs_f64()),
count_accounted = count_accounted_finish, // don't break existing log scraping
count_throttled,
sum_throttled_usecs,
count_accounted_start, // log after pre-existing fields to not break existing log scraping
allowed_rps=%format_args!("{allowed_rps:.0}"),
"shard was throttled in the last n_seconds"
);
});
}
}
/// Waits until the tenant becomes active, or returns `ControlFlow::Break()` to shut down.
async fn wait_for_active_tenant(
tenant: &Arc<Tenant>,
cancel: &CancellationToken,
) -> ControlFlow<()> {
if tenant.current_state() == TenantState::Active {
return ControlFlow::Continue(());
}
let mut update_rx = tenant.subscribe_for_state_updates();
loop {
tokio::select! {
_ = cancel.cancelled() => return ControlFlow::Break(()),
result = update_rx.changed() => if result.is_err() {
return ControlFlow::Break(());
}
}
match &*update_rx.borrow() {
TenantState::Active => {
debug!("Tenant state changed to active, continuing the task loop");
return ControlFlow::Continue(());
}
state => debug!("Not running the task loop, tenant is not active: {state:?}"),
}
}
}
@@ -504,41 +529,26 @@ async fn wait_for_active_tenant(
#[error("cancelled")]
pub(crate) struct Cancelled;
/// Sleeps for a random interval up to the given max value.
/// Provide a random delay for background task initialization.
///
/// This delay prevents a thundering herd of background tasks and will likely keep them running on
/// different periods for more stable load.
pub(crate) async fn sleep_random(
max: Duration,
pub(crate) async fn random_init_delay(
period: Duration,
cancel: &CancellationToken,
) -> Result<Duration, Cancelled> {
sleep_random_range(Duration::ZERO..=max, cancel).await
}
/// Sleeps for a random interval in the given range. Returns the duration.
pub(crate) async fn sleep_random_range(
interval: RangeInclusive<Duration>,
cancel: &CancellationToken,
) -> Result<Duration, Cancelled> {
let delay = rand::thread_rng().gen_range(interval);
if delay == Duration::ZERO {
return Ok(delay);
) -> Result<(), Cancelled> {
if period == Duration::ZERO {
return Ok(());
}
tokio::select! {
_ = cancel.cancelled() => Err(Cancelled),
_ = tokio::time::sleep(delay) => Ok(delay),
}
}
/// Sleeps for an interval with a random jitter.
pub(crate) async fn sleep_jitter(
duration: Duration,
jitter: Duration,
cancel: &CancellationToken,
) -> Result<Duration, Cancelled> {
let from = duration.saturating_sub(jitter);
let to = duration.saturating_add(jitter);
sleep_random_range(from..=to, cancel).await
let d = {
let mut rng = rand::thread_rng();
rng.gen_range(Duration::ZERO..=period)
};
match tokio::time::timeout(d, cancel.cancelled()).await {
Ok(_) => Err(Cancelled),
Err(_) => Ok(()),
}
}
struct Iteration {
@@ -554,25 +564,42 @@ struct IterationResult<O> {
impl Iteration {
#[instrument(skip_all)]
pub(crate) async fn run<F: Future<Output = O>, O>(self, fut: F) -> IterationResult<O> {
let mut fut = pin!(fut);
pub(crate) async fn run<Fut, O>(self, fut: Fut) -> IterationResult<O>
where
Fut: std::future::Future<Output = O>,
{
let Self {
started_at,
period,
kind,
} = self;
let mut fut = std::pin::pin!(fut);
// Wrap `fut` into a future that logs a message every `period` so that we get a
// very obvious breadcrumb in the logs _while_ a slow iteration is happening.
let output = loop {
match tokio::time::timeout(self.period, &mut fut).await {
Ok(r) => break r,
Err(_) => info!("still running"),
let liveness_logger = async move {
loop {
match tokio::time::timeout(period, &mut fut).await {
Ok(x) => return x,
Err(_) => {
// info level as per the same rationale why warn_when_period_overrun is info
// => https://github.com/neondatabase/neon/pull/5724
info!("still running");
}
}
}
};
let elapsed = self.started_at.elapsed();
warn_when_period_overrun(elapsed, self.period, self.kind);
let output = liveness_logger.await;
let elapsed = started_at.elapsed();
warn_when_period_overrun(elapsed, period, kind);
IterationResult { output, elapsed }
}
}
// NB: the `task` and `period` are used for metrics labels.
/// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric.
pub(crate) fn warn_when_period_overrun(
elapsed: Duration,
period: Duration,
@@ -590,7 +617,7 @@ pub(crate) fn warn_when_period_overrun(
"task iteration took longer than the configured period"
);
crate::metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
.with_label_values(&[task.into(), &format!("{}", period.as_secs())])
.with_label_values(&[task.as_static_str(), &format!("{}", period.as_secs())])
.inc();
}
}

View File

@@ -52,7 +52,6 @@ use tokio::{
};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::critical;
use utils::rate_limit::RateLimit;
use utils::{
fs_ext,
@@ -532,9 +531,6 @@ impl GcInfo {
pub(super) fn remove_child_offloaded(&mut self, child_id: TimelineId) -> bool {
self.remove_child_maybe_offloaded(child_id, MaybeOffloaded::Yes)
}
pub(crate) fn lsn_covered_by_lease(&self, lsn: Lsn) -> bool {
self.leases.contains_key(&lsn)
}
}
/// The `GcInfo` component describing which Lsns need to be retained. Functionally, this
@@ -1718,9 +1714,8 @@ impl Timeline {
let prepare = async move {
let guard = self.compaction_lock.lock().await;
let permit = super::tasks::acquire_concurrency_permit(
let permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
BackgroundLoopKind::Compaction,
self.conf.use_compaction_semaphore,
ctx,
)
.await;
@@ -2633,7 +2628,7 @@ impl Timeline {
return;
}
FlushLoopState::Exited => {
info!(
warn!(
"ignoring attempt to restart exited flush_loop {}/{}",
self.tenant_shard_id, self.timeline_id
);
@@ -3057,9 +3052,8 @@ impl Timeline {
let self_ref = &self;
let skip_concurrency_limiter = &skip_concurrency_limiter;
async move {
let wait_for_permit = super::tasks::acquire_concurrency_permit(
let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
BackgroundLoopKind::InitialLogicalSizeCalculation,
false,
background_ctx,
);
@@ -5810,11 +5804,10 @@ impl Timeline {
let img = match res {
Ok(img) => img,
Err(walredo::Error::Cancelled) => return Err(PageReconstructError::Cancelled),
Err(walredo::Error::Other(err)) => {
critical!("walredo failure during page reconstruction: {err:?}");
Err(walredo::Error::Other(e)) => {
return Err(PageReconstructError::WalRedo(
err.context("reconstruct a page image"),
));
e.context("reconstruct a page image"),
))
}
};
Ok(img)

View File

@@ -10,8 +10,8 @@ use std::sync::Arc;
use super::layer_manager::LayerManager;
use super::{
CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, GetVectoredError,
ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration, Timeline,
CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
LastImageLayerCreationStatus, RecordedDuration, Timeline,
};
use anyhow::{anyhow, bail, Context};
@@ -26,7 +26,6 @@ use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
use serde::Serialize;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, info_span, trace, warn, Instrument};
use utils::critical;
use utils::id::TimelineId;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
@@ -34,7 +33,6 @@ use crate::page_cache;
use crate::statvfs::Statvfs;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::gc_block::GcBlock;
use crate::tenant::layer_map::LayerMap;
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::batch_split_writer::{
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
@@ -440,11 +438,6 @@ impl KeyHistoryRetention {
if dry_run {
return true;
}
if LayerMap::is_l0(&key.key_range, key.is_delta) {
// gc-compaction should not produce L0 deltas, otherwise it will break the layer order.
// We should ignore such layers.
return true;
}
let layer_generation;
{
let guard = tline.layers.read().await;
@@ -749,15 +742,7 @@ impl Timeline {
.as_ref()
.clone(),
)
.await
.inspect_err(|err| {
if let CreateImageLayersError::GetVectoredError(
GetVectoredError::MissingKey(_),
) = err
{
critical!("missing key during compaction: {err:?}");
}
})?;
.await?;
self.last_image_layer_creation_status
.store(Arc::new(outcome.clone()));

View File

@@ -341,13 +341,6 @@ impl DeleteTimelineFlow {
let tenant_shard_id = timeline.tenant_shard_id();
let timeline_id = timeline.timeline_id();
// Take a tenant gate guard, because timeline deletion needs access to the tenant to update its manifest.
let Ok(tenant_guard) = tenant.gate.enter() else {
// It is safe to simply skip here, because we only schedule background work once the timeline is durably marked for deletion.
info!("Tenant is shutting down, timeline deletion will be resumed when it next starts");
return;
};
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::TimelineDeletionWorker,
@@ -355,8 +348,6 @@ impl DeleteTimelineFlow {
Some(timeline_id),
"timeline_delete",
async move {
let _guard = tenant_guard;
if let Err(err) = Self::background(guard, conf, &tenant, &timeline, remote_client).await {
// Only log as an error if it's not a cancellation.
if matches!(err, DeleteTimelineError::Cancelled) {

View File

@@ -30,11 +30,8 @@ use crate::{
pgdatadir_mapping::CollectKeySpaceError,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
size::CalculateSyntheticSizeError,
storage_layer::LayerVisibilityHint,
tasks::{sleep_random, BackgroundLoopKind, BackgroundLoopSemaphorePermit},
timeline::EvictionError,
LogicalSizeCalculationCause, Tenant,
size::CalculateSyntheticSizeError, storage_layer::LayerVisibilityHint,
tasks::BackgroundLoopKind, timeline::EvictionError, LogicalSizeCalculationCause, Tenant,
},
};
@@ -83,6 +80,8 @@ impl Timeline {
#[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
async fn eviction_task(self: Arc<Self>, tenant: Arc<Tenant>) {
use crate::tenant::tasks::random_init_delay;
// acquire the gate guard only once within a useful span
let Ok(guard) = self.gate.enter() else {
return;
@@ -95,7 +94,7 @@ impl Timeline {
EvictionPolicy::OnlyImitiate(lat) => lat.period,
EvictionPolicy::NoEviction => Duration::from_secs(10),
};
if sleep_random(period, &self.cancel).await.is_err() {
if random_init_delay(period, &self.cancel).await.is_err() {
return;
}
}
@@ -331,10 +330,9 @@ impl Timeline {
&self,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> ControlFlow<(), BackgroundLoopSemaphorePermit<'static>> {
let acquire_permit = crate::tenant::tasks::acquire_concurrency_permit(
) -> ControlFlow<(), tokio::sync::SemaphorePermit<'static>> {
let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit(
BackgroundLoopKind::Eviction,
false,
ctx,
);
@@ -376,7 +374,7 @@ impl Timeline {
p: &EvictionPolicyLayerAccessThreshold,
cancel: &CancellationToken,
gate: &GateGuard,
permit: BackgroundLoopSemaphorePermit<'static>,
permit: tokio::sync::SemaphorePermit<'static>,
ctx: &RequestContext,
) -> ControlFlow<()> {
if !self.tenant_shard_id.is_shard_zero() {

View File

@@ -39,7 +39,7 @@ use crate::{
use postgres_backend::is_expected_io_error;
use postgres_connection::PgConnectionConfig;
use postgres_ffi::waldecoder::WalStreamDecoder;
use utils::{critical, id::NodeId, lsn::Lsn, postgres_client::PostgresClientProtocol};
use utils::{id::NodeId, lsn::Lsn, postgres_client::PostgresClientProtocol};
use utils::{pageserver_feedback::PageserverFeedback, sync::gate::GateError};
/// Status of the connection.
@@ -393,13 +393,6 @@ pub(super) async fn handle_walreceiver_connection(
.await
.with_context(|| {
format!("could not ingest record at {local_next_record_lsn}")
})
.inspect_err(|err| {
// TODO: we can't differentiate cancellation errors with
// anyhow::Error, so just ignore it if we're cancelled.
if !cancellation.is_cancelled() {
critical!("{err:?}")
}
})?;
uncommitted_records += 1;
@@ -527,13 +520,6 @@ pub(super) async fn handle_walreceiver_connection(
.await
.with_context(|| {
format!("could not ingest record at {next_record_lsn}")
})
.inspect_err(|err| {
// TODO: we can't differentiate cancellation errors with
// anyhow::Error, so just ignore it if we're cancelled.
if !cancellation.is_cancelled() {
critical!("{err:?}")
}
})?;
if !ingested {
tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}");

View File

@@ -1157,7 +1157,6 @@ impl WalIngest {
// See also the neon code changes in the InitWalRecovery() function.
if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
&& info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
&& self.shard.is_shard_zero()
{
let oldest_active_xid = if pg_version >= 17 {
let mut oldest_active_full_xid = cp.nextXid.value;

View File

@@ -79,14 +79,6 @@ impl WalRedoProcess {
.env_clear()
.env("LD_LIBRARY_PATH", &pg_lib_dir_path)
.env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
.env(
"ASAN_OPTIONS",
std::env::var("ASAN_OPTIONS").unwrap_or_default(),
)
.env(
"UBSAN_OPTIONS",
std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
)
// NB: The redo process is not trusted after we sent it the first
// walredo work. Before that, it is trusted. Specifically, we trust
// it to

View File

@@ -220,10 +220,8 @@ lfc_maybe_disabled(void)
static bool
lfc_ensure_opened(void)
{
bool enabled = !lfc_maybe_disabled();
/* Open cache file if not done yet */
if (lfc_desc <= 0 && enabled)
if (lfc_desc <= 0)
{
lfc_desc = BasicOpenFile(lfc_path, O_RDWR);
@@ -233,7 +231,7 @@ lfc_ensure_opened(void)
return false;
}
}
return enabled;
return true;
}
static void
@@ -338,10 +336,11 @@ lfc_change_limit_hook(int newval, void *extra)
{
uint32 new_size = SIZE_MB_TO_CHUNKS(newval);
if (!is_normal_backend())
if (!lfc_ctl || !is_normal_backend())
return;
if (!lfc_ensure_opened())
/* Open LFC file only if LFC was enabled or we are going to reenable it */
if ((newval > 0 || LFC_ENABLED()) && !lfc_ensure_opened())
return;
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
@@ -509,44 +508,47 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
tag.blockNum = (blkno + i) & ~(BLOCKS_PER_CHUNK - 1);
hash = get_hash_value(lfc_hash, &tag);
chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
chunk_offs = (blkno + i) & (BLOCKS_PER_CHUNK - 1);
LWLockAcquire(lfc_lock, LW_SHARED);
if (!LFC_ENABLED())
{
LWLockRelease(lfc_lock);
return 0;
}
while (true)
{
int this_chunk = Min(nblocks - i, BLOCKS_PER_CHUNK - chunk_offs);
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
if (entry != NULL)
int this_chunk = Min(nblocks, BLOCKS_PER_CHUNK - chunk_offs);
if (LFC_ENABLED())
{
for (; chunk_offs < BLOCKS_PER_CHUNK && i < nblocks; chunk_offs++, i++)
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
if (entry != NULL)
{
if ((entry->bitmap[chunk_offs >> 5] &
((uint32)1 << (chunk_offs & 31))) != 0)
for (; chunk_offs < BLOCKS_PER_CHUNK && i < nblocks; chunk_offs++, i++)
{
BITMAP_SET(bitmap, i);
found++;
if ((entry->bitmap[chunk_offs >> 5] &
((uint32)1 << (chunk_offs & 31))) != 0)
{
BITMAP_SET(bitmap, i);
found++;
}
}
}
else
{
i += this_chunk;
}
}
else
{
i += this_chunk;
LWLockRelease(lfc_lock);
return found;
}
/*
* Break out of the iteration before doing expensive stuff for
* a next iteration
*/
if (i >= nblocks)
if (i + 1 >= nblocks)
break;
/*
@@ -560,8 +562,8 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
LWLockRelease(lfc_lock);
#ifdef USE_ASSERT_CHECKING
{
#if USE_ASSERT_CHECKING
do {
int count = 0;
for (int j = 0; j < nblocks; j++)
@@ -571,7 +573,7 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
Assert(count == found);
}
} while (false);
#endif
return found;

View File

@@ -36,11 +36,6 @@
#include "pagestore_client.h"
#include "walproposer.h"
#ifdef __linux__
#include <sys/ioctl.h>
#include <linux/sockios.h>
#endif
#define PageStoreTrace DEBUG5
#define MIN_RECONNECT_INTERVAL_USEC 1000
@@ -733,36 +728,11 @@ retry:
INSTR_TIME_SUBTRACT(since_last_log, last_log_ts);
if (INSTR_TIME_GET_MILLISEC(since_last_log) >= LOG_INTERVAL_MS)
{
int sndbuf = -1;
int recvbuf = -1;
#ifdef __linux__
int socketfd;
#endif
since_start = now;
INSTR_TIME_SUBTRACT(since_start, start_ts);
#ifdef __linux__
/*
* get kernel's send and recv queue size via ioctl
* https://elixir.bootlin.com/linux/v6.1.128/source/include/uapi/linux/sockios.h#L25-L27
*/
socketfd = PQsocket(pageserver_conn);
if (socketfd != -1) {
int ioctl_err;
ioctl_err = ioctl(socketfd, SIOCOUTQ, &sndbuf);
if (ioctl_err!= 0) {
sndbuf = -errno;
}
ioctl_err = ioctl(socketfd, FIONREAD, &recvbuf);
if (ioctl_err != 0) {
recvbuf = -errno;
}
}
#endif
neon_shard_log(shard_no, LOG, "no response received from pageserver for %0.3f s, still waiting (sent " UINT64_FORMAT " requests, received " UINT64_FORMAT " responses) (socket sndbuf=%d recvbuf=%d)",
neon_shard_log(shard_no, LOG, "no response received from pageserver for %0.3f s, still waiting (sent " UINT64_FORMAT " requests, received " UINT64_FORMAT " responses)",
INSTR_TIME_GET_DOUBLE(since_start),
shard->nrequests_sent, shard->nresponses_received, sndbuf, recvbuf);
shard->nrequests_sent, shard->nresponses_received);
last_log_ts = now;
logged = true;
}

View File

@@ -916,7 +916,7 @@ prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
{
uint64 min_ring_index;
PrefetchRequest hashkey;
#ifdef USE_ASSERT_CHECKING
#if USE_ASSERT_CHECKING
bool any_hits = false;
#endif
/* We will never read further ahead than our buffer can store. */
@@ -955,7 +955,7 @@ Retry:
else
lsns = NULL;
#ifdef USE_ASSERT_CHECKING
#if USE_ASSERT_CHECKING
any_hits = true;
#endif
@@ -3011,7 +3011,7 @@ neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_block
start_ts = GetCurrentTimestamp();
if (RecoveryInProgress() && MyBackendType != B_STARTUP)
XLogWaitForReplayOf(reqlsns->request_lsn);
XLogWaitForReplayOf(reqlsns[0].request_lsn);
/*
* Try to find prefetched page in the list of received pages.

View File

@@ -37,8 +37,8 @@ To play with it locally one may start proxy over a local postgres installation
If both postgres and proxy are running you may send a SQL query:
```console
curl -k -X POST 'https://proxy.local.neon.build:4444/sql' \
-H 'Neon-Connection-String: postgres://stas:pass@proxy.local.neon.build:4444/postgres' \
curl -k -X POST 'https://proxy.localtest.me:4444/sql' \
-H 'Neon-Connection-String: postgres://stas:pass@proxy.localtest.me:4444/postgres' \
-H 'Content-Type: application/json' \
--data '{
"query":"SELECT $1::int[] as arr, $2::jsonb as obj, 42 as num",
@@ -104,7 +104,7 @@ cases where it is hard to use rows represented as objects (e.g. when several fie
## Test proxy locally
Proxy determines project name from the subdomain, request to the `round-rice-566201.somedomain.tld` will be routed to the project named `round-rice-566201`. Unfortunately, `/etc/hosts` does not support domain wildcards, so we can use *.local.neon.build` which resolves to `127.0.0.1`.
Proxy determines project name from the subdomain, request to the `round-rice-566201.somedomain.tld` will be routed to the project named `round-rice-566201`. Unfortunately, `/etc/hosts` does not support domain wildcards, so we can use *.localtest.me` which resolves to `127.0.0.1`.
We will need to have a postgres instance. Assuming that we have set up docker we can set it up as follows:
```sh
@@ -125,7 +125,7 @@ docker exec -it proxy-postgres psql -U postgres -c "CREATE ROLE proxy WITH SUPER
Let's create self-signed certificate by running:
```sh
openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key -subj "/CN=*.local.neon.build"
openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key -subj "/CN=*.localtest.me"
```
Then we need to build proxy with 'testing' feature and run, e.g.:
@@ -136,5 +136,5 @@ RUST_LOG=proxy cargo run -p proxy --bin proxy --features testing -- --auth-backe
Now from client you can start a new session:
```sh
PGSSLROOTCERT=./server.crt psql "postgresql://proxy:password@endpoint.local.neon.build:4432/postgres?sslmode=verify-full"
PGSSLROOTCERT=./server.crt psql "postgresql://proxy:password@endpoint.localtest.me:4432/postgres?sslmode=verify-full"
```

View File

@@ -69,35 +69,17 @@ pub async fn handle_cancel_messages(
value,
resp_tx,
_guard,
expire,
expire: _,
} => {
let res = client.hset(&key, field, value).await;
if let Some(resp_tx) = resp_tx {
if res.is_ok() {
resp_tx
.send(client.expire(key, expire).await)
.inspect_err(|e| {
tracing::debug!(
"failed to send StoreCancelKey response: {:?}",
e
);
})
.ok();
} else {
resp_tx
.send(res)
.inspect_err(|e| {
tracing::debug!(
"failed to send StoreCancelKey response: {:?}",
e
);
})
.ok();
}
} else if res.is_ok() {
drop(client.expire(key, expire).await);
resp_tx
.send(client.hset(key, field, value).await)
.inspect_err(|e| {
tracing::debug!("failed to send StoreCancelKey response: {:?}", e);
})
.ok();
} else {
tracing::warn!("failed to store cancel key: {:?}", res);
drop(client.hset(key, field, value).await);
}
}
CancelKeyOp::GetCancelData {
@@ -454,7 +436,7 @@ impl Session {
&self.key
}
// Send the store key op to the cancellation handler and set TTL for the key
// Send the store key op to the cancellation handler
pub(crate) async fn write_cancel_key(
&self,
cancel_closure: CancelClosure,

View File

@@ -15,8 +15,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::mpsc::error::SendError;
use tokio::task::JoinHandle;
use tokio::time::MissedTickBehavior;
use tracing::{error, info, info_span, Instrument};
use utils::critical;
use tracing::{info_span, Instrument};
use utils::lsn::Lsn;
use utils::postgres_client::Compression;
use utils::postgres_client::InterpretedFormat;
@@ -121,20 +120,6 @@ pub enum InterpretedWalReaderError {
WalStreamClosed,
}
enum CurrentPositionUpdate {
Reset(Lsn),
NotReset(Lsn),
}
impl CurrentPositionUpdate {
fn current_position(&self) -> Lsn {
match self {
CurrentPositionUpdate::Reset(lsn) => *lsn,
CurrentPositionUpdate::NotReset(lsn) => *lsn,
}
}
}
impl InterpretedWalReaderState {
fn current_position(&self) -> Option<Lsn> {
match self {
@@ -144,26 +129,6 @@ impl InterpretedWalReaderState {
InterpretedWalReaderState::Done => None,
}
}
// Reset the current position of the WAL reader if the requested starting position
// of the new shard is smaller than the current value.
fn maybe_reset(&mut self, new_shard_start_pos: Lsn) -> CurrentPositionUpdate {
match self {
InterpretedWalReaderState::Running {
current_position, ..
} => {
if new_shard_start_pos < *current_position {
*current_position = new_shard_start_pos;
CurrentPositionUpdate::Reset(*current_position)
} else {
CurrentPositionUpdate::NotReset(*current_position)
}
}
InterpretedWalReaderState::Done => {
panic!("maybe_reset called on finished reader")
}
}
}
}
pub(crate) struct AttachShardNotification {
@@ -214,10 +179,11 @@ impl InterpretedWalReader {
metric.dec();
}
reader
.run_impl(start_pos)
.await
.inspect_err(|err| critical!("failed to read WAL record: {err:?}"))
let res = reader.run_impl(start_pos).await;
if let Err(ref err) = res {
tracing::error!("Task finished with error: {err}");
}
res
}
.instrument(info_span!("interpreted wal reader")),
);
@@ -273,10 +239,11 @@ impl InterpretedWalReader {
metric.dec();
}
if let Err(err) = self.run_impl(start_pos).await {
critical!("failed to read WAL record: {err:?}");
let res = self.run_impl(start_pos).await;
if let Err(err) = res {
tracing::error!("Interpreted wal reader encountered error: {err}");
} else {
info!("interpreted wal reader exiting");
tracing::info!("Interpreted wal reader exiting");
}
Err(CopyStreamHandlerEnd::Other(anyhow!(
@@ -443,24 +410,15 @@ impl InterpretedWalReader {
};
senders.push(ShardSenderState { sender_id: new_sender_id, tx: sender, next_record_lsn: start_pos});
// If the shard is subscribing below the current position the we need
// to update the cursor that tracks where we are at in the WAL
// ([`Self::state`]) and reset the WAL stream itself
// (`[Self::wal_stream`]). This must be done atomically from the POV of
// anything outside the select statement.
let position_reset = self.state.write().unwrap().maybe_reset(start_pos);
match position_reset {
CurrentPositionUpdate::Reset(to) => {
self.wal_stream.reset(to).await;
wal_decoder = WalStreamDecoder::new(to, self.pg_version);
},
CurrentPositionUpdate::NotReset(_) => {}
};
let current_pos = self.state.read().unwrap().current_position().unwrap();
if start_pos < current_pos {
self.wal_stream.reset(start_pos).await;
wal_decoder = WalStreamDecoder::new(start_pos, self.pg_version);
}
tracing::info!(
"Added shard sender {} with start_pos={} current_pos={}",
ShardSenderId::new(shard_id, new_sender_id), start_pos, position_reset.current_position()
ShardSenderId::new(shard_id, new_sender_id), start_pos, current_pos
);
}
}
@@ -626,7 +584,7 @@ mod tests {
.unwrap();
let resident_tli = tli.wal_residence_guard().await.unwrap();
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, None)
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT)
.await
.unwrap();
let end_pos = end_watch.get();
@@ -757,6 +715,7 @@ mod tests {
const MSG_COUNT: usize = 200;
const PG_VERSION: u32 = 17;
const SHARD_COUNT: u8 = 2;
const ATTACHED_SHARDS: u8 = 4;
let start_lsn = Lsn::from_str("0/149FD18").unwrap();
let env = Env::new(true).unwrap();
@@ -766,11 +725,9 @@ mod tests {
.unwrap();
let resident_tli = tli.wal_residence_guard().await.unwrap();
let mut next_record_lsns = Vec::default();
let end_watch =
Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, Some(&mut next_record_lsns))
.await
.unwrap();
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT)
.await
.unwrap();
let end_pos = end_watch.get();
let streaming_wal_reader = StreamingWalReader::new(
@@ -789,71 +746,38 @@ mod tests {
)
.unwrap();
struct Sender {
tx: Option<tokio::sync::mpsc::Sender<Batch>>,
rx: tokio::sync::mpsc::Receiver<Batch>,
shard: ShardIdentity,
start_lsn: Lsn,
received_next_record_lsns: Vec<Lsn>,
}
let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
let mut batch_receivers = vec![rx];
impl Sender {
fn new(start_lsn: Lsn, shard: ShardIdentity) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
Self {
tx: Some(tx),
rx,
shard,
start_lsn,
received_next_record_lsns: Vec::default(),
}
}
}
assert!(next_record_lsns.len() > 7);
let start_lsns = vec![
next_record_lsns[5],
next_record_lsns[1],
next_record_lsns[3],
];
let mut senders = start_lsns
.into_iter()
.map(|lsn| Sender::new(lsn, shard_0))
.collect::<Vec<_>>();
let first_sender = senders.first_mut().unwrap();
let handle = InterpretedWalReader::spawn(
streaming_wal_reader,
first_sender.start_lsn,
first_sender.tx.take().unwrap(),
first_sender.shard,
start_lsn,
tx,
shard_0,
PG_VERSION,
&Some("pageserver".to_string()),
);
for sender in senders.iter_mut().skip(1) {
handle
.fanout(sender.shard, sender.tx.take().unwrap(), sender.start_lsn)
.unwrap();
for _ in 0..(ATTACHED_SHARDS - 1) {
let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
handle.fanout(shard_0, tx, start_lsn).unwrap();
batch_receivers.push(rx);
}
for sender in senders.iter_mut() {
loop {
let batch = sender.rx.recv().await.unwrap();
tracing::info!(
"Sender with start_lsn={} received batch ending at {} with {} records",
sender.start_lsn,
batch.wal_end_lsn,
batch.records.records.len()
loop {
let batch = batch_receivers.first_mut().unwrap().recv().await.unwrap();
for rx in batch_receivers.iter_mut().skip(1) {
let other_batch = rx.recv().await.unwrap();
assert_eq!(batch.wal_end_lsn, other_batch.wal_end_lsn);
assert_eq!(
batch.available_wal_end_lsn,
other_batch.available_wal_end_lsn
);
}
for rec in batch.records.records {
sender.received_next_record_lsns.push(rec.next_record_lsn);
}
if batch.wal_end_lsn == batch.available_wal_end_lsn {
break;
}
if batch.wal_end_lsn == batch.available_wal_end_lsn {
break;
}
}
@@ -868,20 +792,5 @@ mod tests {
}
assert!(done);
for sender in senders {
tracing::info!(
"Validating records received by sender with start_lsn={}",
sender.start_lsn
);
assert!(sender.received_next_record_lsns.is_sorted());
let expected = next_record_lsns
.iter()
.filter(|lsn| **lsn > sender.start_lsn)
.copied()
.collect::<Vec<_>>();
assert_eq!(sender.received_next_record_lsns, expected);
}
}
}

View File

@@ -122,7 +122,6 @@ impl Env {
start_lsn: Lsn,
msg_size: usize,
msg_count: usize,
mut next_record_lsns: Option<&mut Vec<Lsn>>,
) -> anyhow::Result<EndWatch> {
let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(receive_wal::MSG_QUEUE_SIZE);
let (reply_tx, mut reply_rx) = tokio::sync::mpsc::channel(receive_wal::REPLY_QUEUE_SIZE);
@@ -131,7 +130,7 @@ impl Env {
WalAcceptor::spawn(tli.wal_residence_guard().await?, msg_rx, reply_tx, Some(0));
let prefix = c"neon-file:";
let prefix = c"p";
let prefixlen = prefix.to_bytes_with_nul().len();
assert!(msg_size >= prefixlen);
let message = vec![0; msg_size - prefixlen];
@@ -140,9 +139,6 @@ impl Env {
&mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), start_lsn);
for _ in 0..msg_count {
let (lsn, record) = walgen.next().unwrap();
if let Some(ref mut lsns) = next_record_lsns {
lsns.push(lsn);
}
let req = AppendRequest {
h: AppendRequestHeader {

View File

@@ -592,8 +592,6 @@ impl Timeline {
assert!(self.cancel.is_cancelled());
assert!(self.gate.close_complete());
info!("deleting timeline {} from disk", self.ttid);
// Close associated FDs. Nobody will be able to touch timeline data once
// it is cancelled, so WAL storage won't be opened again.
shared_state.sk.close_wal_store();

View File

@@ -475,8 +475,6 @@ impl GlobalTimelines {
info!("deleting timeline {}, only_local={}", ttid, only_local);
timeline.shutdown().await;
info!("timeline {ttid} shut down for deletion");
// Take a lock and finish the deletion holding this mutex.
let mut shared_state = timeline.write_shared_state().await;

View File

@@ -246,7 +246,7 @@ mod tests {
.unwrap();
let resident_tli = tli.wal_residence_guard().await.unwrap();
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, None)
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT)
.await
.unwrap();
let end_pos = end_watch.get();

View File

@@ -32,7 +32,6 @@ CREATE TABLE IF NOT EXISTS results (
flaky BOOLEAN NOT NULL,
arch arch DEFAULT 'X64',
lfc BOOLEAN DEFAULT false NOT NULL,
sanitizers BOOLEAN DEFAULT false NOT NULL,
build_type TEXT NOT NULL,
pg_version INT NOT NULL,
run_id BIGINT NOT NULL,
@@ -40,7 +39,7 @@ CREATE TABLE IF NOT EXISTS results (
reference TEXT NOT NULL,
revision CHAR(40) NOT NULL,
raw JSONB COMPRESSION lz4 NOT NULL,
UNIQUE (parent_suite, suite, name, arch, lfc, sanitizers, build_type, pg_version, started_at, stopped_at, run_id)
UNIQUE (parent_suite, suite, name, arch, build_type, pg_version, started_at, stopped_at, run_id)
);
"""
@@ -57,7 +56,6 @@ class Row:
flaky: bool
arch: str
lfc: bool
sanitizers: bool
build_type: str
pg_version: int
run_id: int
@@ -137,7 +135,6 @@ def ingest_test_result(
}
arch = parameters.get("arch", "UNKNOWN").strip("'")
lfc = parameters.get("lfc", "without-lfc").strip("'") == "with-lfc"
sanitizers = parameters.get("sanitizers", "disabled").strip("'") == "enabled"
build_type, pg_version, unparametrized_name = parse_test_name(test["name"])
labels = {label["name"]: label["value"] for label in test["labels"]}
@@ -152,7 +149,6 @@ def ingest_test_result(
flaky=test["flaky"] or test["retriesStatusChange"],
arch=arch,
lfc=lfc,
sanitizers=sanitizers,
build_type=build_type,
pg_version=pg_version,
run_id=run_id,

View File

@@ -3345,7 +3345,7 @@ class NeonProxy(PgProtocol):
metric_collection_interval: str | None = None,
):
host = "127.0.0.1"
domain = "proxy.local.neon.build" # resolves to 127.0.0.1
domain = "proxy.localtest.me" # resolves to 127.0.0.1
super().__init__(dsn=auth_backend.default_conn_url, host=domain, port=proxy_port)
self.domain = domain
@@ -3368,7 +3368,7 @@ class NeonProxy(PgProtocol):
# generate key of it doesn't exist
crt_path = self.test_output_dir / "proxy.crt"
key_path = self.test_output_dir / "proxy.key"
generate_proxy_tls_certs("*.local.neon.build", key_path, crt_path)
generate_proxy_tls_certs("*.localtest.me", key_path, crt_path)
args = [
str(self.neon_binpath / "proxy"),
@@ -3569,7 +3569,7 @@ class NeonAuthBroker:
external_http_port: int,
auth_backend: NeonAuthBroker.ProxyV1,
):
self.domain = "apiauth.local.neon.build" # resolves to 127.0.0.1
self.domain = "apiauth.localtest.me" # resolves to 127.0.0.1
self.host = "127.0.0.1"
self.http_port = http_port
self.external_http_port = external_http_port
@@ -3586,7 +3586,7 @@ class NeonAuthBroker:
# generate key of it doesn't exist
crt_path = self.test_output_dir / "proxy.crt"
key_path = self.test_output_dir / "proxy.key"
generate_proxy_tls_certs("apiauth.local.neon.build", key_path, crt_path)
generate_proxy_tls_certs("apiauth.localtest.me", key_path, crt_path)
args = [
str(self.neon_binpath / "proxy"),
@@ -5122,14 +5122,12 @@ def wait_for_last_flush_lsn(
timeline: TimelineId,
pageserver_id: int | None = None,
auth_token: str | None = None,
last_flush_lsn: Lsn | None = None,
) -> Lsn:
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
shards = tenant_get_shards(env, tenant, pageserver_id)
if last_flush_lsn is None:
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
results = []
for tenant_shard_id, pageserver in shards:

View File

@@ -124,8 +124,5 @@ def pytest_runtest_makereport(*args, **kwargs):
allure.dynamic.parameter(
"__lfc", "with-lfc" if os.getenv("USE_LFC") != "false" else "without-lfc"
)
allure.dynamic.parameter(
"__sanitizers", "enabled" if os.getenv("SANITIZERS") == "enabled" else "disabled"
)
yield

View File

@@ -282,35 +282,18 @@ class S3Storage:
def timeline_path(self, tenant_id: TenantShardId | TenantId, timeline_id: TimelineId) -> str:
return f"{self.tenant_path(tenant_id)}/timelines/{timeline_id}"
def get_latest_generation_key(self, prefix: str, suffix: str, keys: list[str]) -> str:
"""
Gets the latest generation key from a list of keys.
@param index_keys: A list of keys of different generations, which start with `prefix`
"""
def parse_gen(key: str) -> int:
shortname = key.split("/")[-1]
generation_str = shortname.removeprefix(prefix).removesuffix(suffix)
try:
return int(generation_str, base=16)
except ValueError:
log.info(f"Ignoring non-matching key: {key}")
return -1
if len(keys) == 0:
raise IndexError("No keys found")
return max(keys, key=parse_gen)
def get_latest_index_key(self, index_keys: list[str]) -> str:
"""
Gets the latest index file key.
@param index_keys: A list of index keys of different generations.
"""
key = self.get_latest_generation_key(prefix="index_part.json-", suffix="", keys=index_keys)
return key
def parse_gen(index_key: str) -> int:
parts = index_key.split("index_part.json-")
return int(parts[-1], base=16) if len(parts) == 2 else -1
return max(index_keys, key=parse_gen)
def download_index_part(self, index_key: str) -> IndexPartDump:
"""
@@ -323,29 +306,6 @@ class S3Storage:
log.info(f"index_part.json: {body}")
return IndexPartDump.from_json(json.loads(body))
def download_tenant_manifest(self, tenant_id: TenantId) -> dict[str, Any] | None:
tenant_prefix = self.tenant_path(tenant_id)
objects = self.client.list_objects_v2(Bucket=self.bucket_name, Prefix=f"{tenant_prefix}/")[
"Contents"
]
keys = [obj["Key"] for obj in objects if obj["Key"].find("tenant-manifest") != -1]
try:
manifest_key = self.get_latest_generation_key("tenant-manifest-", ".json", keys)
except IndexError:
log.info(
f"No manifest found for tenant {tenant_id}, this is normal if it didn't offload anything yet"
)
return None
response = self.client.get_object(Bucket=self.bucket_name, Key=manifest_key)
body = response["Body"].read().decode("utf-8")
log.info(f"Downloaded manifest {manifest_key}: {body}")
manifest = json.loads(body)
assert isinstance(manifest, dict)
return manifest
def heatmap_key(self, tenant_id: TenantId) -> str:
return f"{self.tenant_path(tenant_id)}/{TENANT_HEATMAP_FILE_NAME}"

View File

@@ -76,9 +76,6 @@ def test_ingest_logical_message(
log.info("Waiting for Pageserver to catch up")
wait_for_last_record_lsn(client, env.initial_tenant, env.initial_timeline, end_lsn)
recover_to_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
endpoint.stop()
# Now that all data is ingested, delete and recreate the tenant in the pageserver. This will
# reingest all the WAL from the safekeeper without any other constraints. This gives us a
# baseline of how fast the pageserver can ingest this WAL in isolation.
@@ -91,13 +88,7 @@ def test_ingest_logical_message(
with zenbenchmark.record_duration("pageserver_recover_ingest"):
log.info("Recovering WAL into pageserver")
client.timeline_create(env.pg_version, env.initial_tenant, env.initial_timeline)
wait_for_last_flush_lsn(
env, endpoint, env.initial_tenant, env.initial_timeline, last_flush_lsn=recover_to_lsn
)
# Check endpoint can start, i.e. we really recovered
endpoint.start()
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
# Emit metrics.
wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024))

View File

@@ -136,7 +136,7 @@ def run_command_and_log_output(command, log_file_path: Path):
"LD_LIBRARY_PATH": f"{os.getenv('PGCOPYDB_LIB_PATH')}:{os.getenv('PG_16_LIB_PATH')}",
"PGCOPYDB_SOURCE_PGURI": cast(str, os.getenv("BENCHMARK_INGEST_SOURCE_CONNSTR")),
"PGCOPYDB_TARGET_PGURI": cast(str, os.getenv("BENCHMARK_INGEST_TARGET_CONNSTR")),
"PGOPTIONS": "-c idle_in_transaction_session_timeout=0 -c maintenance_work_mem=8388608 -c max_parallel_maintenance_workers=7",
"PGOPTIONS": "-c maintenance_work_mem=8388608 -c max_parallel_maintenance_workers=7",
}
# Combine the current environment with custom variables
env = os.environ.copy()

View File

@@ -314,10 +314,7 @@ def test_forward_compatibility(
def check_neon_works(env: NeonEnv, test_output_dir: Path, sql_dump_path: Path, repo_dir: Path):
ep = env.endpoints.create("main")
ep_env = {"LD_LIBRARY_PATH": str(env.pg_distrib_dir / f"v{env.pg_version}/lib")}
ep.start(env=ep_env)
ep = env.endpoints.create_start("main")
connstr = ep.connstr()
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
@@ -366,7 +363,7 @@ def check_neon_works(env: NeonEnv, test_output_dir: Path, sql_dump_path: Path, r
)
# Timeline exists again: restart the endpoint
ep.start(env=ep_env)
ep.start()
pg_bin.run_capture(
["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump-from-wal.sql'}"]
@@ -474,14 +471,6 @@ HISTORIC_DATA_SETS = [
PgVersion.V16,
"https://neon-github-public-dev.s3.eu-central-1.amazonaws.com/compatibility-data-snapshots/2024-07-18-pgv16.tar.zst",
),
# This dataset created on a pageserver running modern code at time of capture, but configured with no generation. This
# is our regression test that we can load data written without generations in layer file names & indices
HistoricDataSet(
"2025-02-07-nogenerations",
TenantId("e1411ca6562d6ff62419f693a5695d67"),
PgVersion.V17,
"https://neon-github-public-dev.s3.eu-central-1.amazonaws.com/compatibility-data-snapshots/2025-02-07-pgv17-nogenerations.tar.zst",
),
]

View File

@@ -172,7 +172,7 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder, attach_mode: str):
# force removal of layers from the future
tenant_conf = ps_http.tenant_config(tenant_id)
generation_before_detach = get_generation_number()
env.pageserver.http_client().tenant_detach(tenant_id)
env.pageserver.tenant_detach(tenant_id)
failpoint_deletion_queue = "deletion-queue-before-execute-pause"
ps_http.configure_failpoints((failpoint_deletion_queue, "pause"))

View File

@@ -12,6 +12,7 @@ of the pageserver are:
from __future__ import annotations
import os
import re
import time
from enum import StrEnum
@@ -28,6 +29,7 @@ from fixtures.pageserver.common_types import parse_layer_file_name
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import (
assert_tenant_state,
list_prefix,
wait_for_last_record_lsn,
wait_for_upload,
)
@@ -122,6 +124,109 @@ def assert_deletion_queue(ps_http, size_fn) -> None:
assert size_fn(v) is True
def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
"""
Validate behavior when a pageserver is run without generation support enabled,
then started again after activating it:
- Before upgrade, no objects should have generation suffixes
- After upgrade, the bucket should contain a mixture.
- In both cases, postgres I/O should work.
"""
neon_env_builder.enable_pageserver_remote_storage(
RemoteStorageKind.MOCK_S3,
)
env = neon_env_builder.init_configs()
env.broker.start()
for sk in env.safekeepers:
sk.start()
env.storage_controller.start()
# We will start a pageserver with no control_plane_api set, so it won't be able to self-register
env.storage_controller.node_register(env.pageserver)
def remove_control_plane_api_field(config):
return config.pop("control_plane_api")
control_plane_api = env.pageserver.edit_config_toml(remove_control_plane_api_field)
env.pageserver.start()
env.storage_controller.node_configure(env.pageserver.id, {"availability": "Active"})
env.create_tenant(
tenant_id=env.initial_tenant, conf=TENANT_CONF, timeline_id=env.initial_timeline
)
generate_uploads_and_deletions(env, pageserver=env.pageserver)
def parse_generation_suffix(key):
m = re.match(".+-([0-9a-zA-Z]{8})$", key)
if m is None:
return None
else:
log.info(f"match: {m}")
log.info(f"group: {m.group(1)}")
return int(m.group(1), 16)
assert neon_env_builder.pageserver_remote_storage is not None
pre_upgrade_keys = list(
[
o["Key"]
for o in list_prefix(neon_env_builder.pageserver_remote_storage, delimiter="")[
"Contents"
]
]
)
for key in pre_upgrade_keys:
assert parse_generation_suffix(key) is None
env.pageserver.stop()
# Starting without the override that disabled control_plane_api
env.pageserver.patch_config_toml_nonrecursive(
{
"control_plane_api": control_plane_api,
}
)
env.pageserver.start()
generate_uploads_and_deletions(env, pageserver=env.pageserver, init=False)
legacy_objects: list[str] = []
suffixed_objects = []
post_upgrade_keys = list(
[
o["Key"]
for o in list_prefix(neon_env_builder.pageserver_remote_storage, delimiter="")[
"Contents"
]
]
)
for key in post_upgrade_keys:
log.info(f"post-upgrade key: {key}")
if parse_generation_suffix(key) is not None:
suffixed_objects.append(key)
else:
legacy_objects.append(key)
# Bucket now contains a mixture of suffixed and non-suffixed objects
assert len(suffixed_objects) > 0
assert len(legacy_objects) > 0
# Flush through deletions to get a clean state for scrub: we are implicitly validating
# that our generations-enabled pageserver was able to do deletions of layers
# from earlier which don't have a generation.
env.pageserver.http_client().deletion_queue_flush(execute=True)
assert get_deletion_queue_unexpected_errors(env.pageserver.http_client()) == 0
# Having written a mixture of generation-aware and legacy index_part.json,
# ensure the scrubber handles the situation as expected.
healthy, metadata_summary = env.storage_scrubber.scan_metadata()
assert metadata_summary["tenant_count"] == 1 # Scrubber should have seen our timeline
assert metadata_summary["timeline_count"] == 1
assert metadata_summary["timeline_shard_count"] == 1
assert healthy
def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_pageserver_remote_storage(
RemoteStorageKind.MOCK_S3,

View File

@@ -120,7 +120,7 @@ def post_checks(env: NeonEnv, test_output_dir: Path, db_name: str, endpoint: End
# Run the main PostgreSQL regression tests, in src/test/regress.
#
@pytest.mark.timeout(3000) # Contains many sub-tests, is slow in debug builds
@pytest.mark.timeout(900) # Contains many sub-tests, is slow in debug builds
@pytest.mark.parametrize("shard_count", [None, 4])
def test_pg_regress(
neon_env_builder: NeonEnvBuilder,
@@ -194,7 +194,7 @@ def test_pg_regress(
# Run the PostgreSQL "isolation" tests, in src/test/isolation.
#
@pytest.mark.timeout(1500) # Contains many sub-tests, is slow in debug builds
@pytest.mark.timeout(600) # Contains many sub-tests, is slow in debug builds
@pytest.mark.parametrize("shard_count", [None, 4])
def test_isolation(
neon_env_builder: NeonEnvBuilder,
@@ -222,8 +222,6 @@ def test_isolation(
"max_prepared_transactions=100",
# Enable the test mode, so that we don't need to patch the test cases.
"neon.regress_test_mode = true",
# Stack size should be increased for tests to pass with asan.
"max_stack_depth = 4MB",
],
)
endpoint.safe_psql(f"CREATE DATABASE {DBNAME}")
@@ -419,7 +417,7 @@ def test_tx_abort_with_many_relations(
try:
# Rollback phase should be fast: this is one WAL record that we should process efficiently
fut = exec.submit(rollback_and_wait)
fut.result(timeout=15)
fut.result(timeout=5)
except:
exec.shutdown(wait=False, cancel_futures=True)
raise

View File

@@ -57,7 +57,7 @@ def test_proxy_select_1(static_proxy: NeonProxy):
assert out[0][0] == 1
# with SNI
out = static_proxy.safe_psql("select 42", host="generic-project-name.local.neon.build")
out = static_proxy.safe_psql("select 42", host="generic-project-name.localtest.me")
assert out[0][0] == 42
@@ -234,7 +234,7 @@ def test_sql_over_http_serverless_driver(static_proxy: NeonProxy):
connstr = f"postgresql://http:http@{static_proxy.domain}:{static_proxy.proxy_port}/postgres"
response = requests.post(
f"https://api.local.neon.build:{static_proxy.external_http_port}/sql",
f"https://api.localtest.me:{static_proxy.external_http_port}/sql",
data=json.dumps({"query": "select 42 as answer", "params": []}),
headers={"Content-Type": "application/sql", "Neon-Connection-String": connstr},
verify=str(static_proxy.test_output_dir / "proxy.crt"),

View File

@@ -35,7 +35,7 @@ async def test_proxy_psql_allowed_ips(static_proxy: NeonProxy, vanilla_pg: Vanil
check_cannot_connect(query="select 1", sslsni=0, options="endpoint=private-project")
# with SNI
check_cannot_connect(query="select 1", host="private-project.local.neon.build")
check_cannot_connect(query="select 1", host="private-project.localtest.me")
# no SNI, deprecated `options=project` syntax (before we had several endpoint in project)
out = static_proxy.safe_psql(query="select 1", sslsni=0, options="project=generic-project")
@@ -46,7 +46,7 @@ async def test_proxy_psql_allowed_ips(static_proxy: NeonProxy, vanilla_pg: Vanil
assert out[0][0] == 1
# with SNI
out = static_proxy.safe_psql(query="select 1", host="generic-project.local.neon.build")
out = static_proxy.safe_psql(query="select 1", host="generic-project.localtest.me")
assert out[0][0] == 1

View File

@@ -116,7 +116,7 @@ def test_pg_sni_router(
test_output_dir: Path,
):
generate_tls_cert(
"endpoint.namespace.local.neon.build",
"endpoint.namespace.localtest.me",
test_output_dir / "router.crt",
test_output_dir / "router.key",
)
@@ -130,7 +130,7 @@ def test_pg_sni_router(
with PgSniRouter(
neon_binpath=neon_binpath,
port=router_port,
destination="local.neon.build",
destination="localtest.me",
tls_cert=test_output_dir / "router.crt",
tls_key=test_output_dir / "router.key",
test_output_dir=test_output_dir,
@@ -141,7 +141,7 @@ def test_pg_sni_router(
"select 1",
dbname="postgres",
sslmode="require",
host=f"endpoint--namespace--{pg_port}.local.neon.build",
host=f"endpoint--namespace--{pg_port}.localtest.me",
hostaddr="127.0.0.1",
)
assert out[0][0] == 1

View File

@@ -3,14 +3,12 @@ from __future__ import annotations
import threading
import time
import pytest
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import wait_until
# This test checks of logical replication subscriber is able to correctly restart replication without receiving duplicates.
# It requires tracking information about replication origins at page server side
@pytest.mark.timeout(900) # This test is slow with sanitizers enabled, especially on ARM
def test_subscriber_restart(neon_simple_env: NeonEnv):
env = neon_simple_env
env.create_branch("publisher")

View File

@@ -554,33 +554,8 @@ def test_timeline_archival_chaos(neon_env_builder: NeonEnvBuilder):
log.info(f"Timeline {state.timeline_id} is still active")
shutdown.wait(0.5)
elif state.timeline_id in offloaded_ids:
log.info(f"Timeline {state.timeline_id} is now offloaded in memory")
# Hack: when we see something offloaded in the API, it doesn't guarantee that the offload
# is persistent (it is marked offloaded first, then that is persisted to the tenant manifest).
# So we wait until we see the manifest update before considering it offloaded, that way
# subsequent checks that it doesn't revert to active on a restart will pass reliably.
time.sleep(0.1)
assert isinstance(env.pageserver_remote_storage, S3Storage)
manifest = env.pageserver_remote_storage.download_tenant_manifest(
tenant_id
)
if manifest is None:
log.info(
f"Timeline {state.timeline_id} is not yet offloaded persistently (no manifest)"
)
elif str(state.timeline_id) in [
t["timeline_id"] for t in manifest["offloaded_timelines"]
]:
log.info(
f"Timeline {state.timeline_id} is now offloaded persistently"
)
state.offloaded = True
else:
log.info(
f"Timeline {state.timeline_id} is not yet offloaded persistently (manifest: {manifest})"
)
log.info(f"Timeline {state.timeline_id} is now offloaded")
state.offloaded = True
break
else:
# Timeline is neither offloaded nor active, this is unexpected: the pageserver

View File

@@ -13,12 +13,12 @@
# postgres -D data -p3000
#
# ## Launch proxy with WSS enabled:
# openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key -subj '/CN=*.local.neon.build'
# openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key -subj '/CN=*.neon.localtest.me'
# ./target/debug/proxy --wss 127.0.0.1:40433 --http 127.0.0.1:28080 --mgmt 127.0.0.1:9099 --proxy 127.0.0.1:4433 --tls-key server.key --tls-cert server.crt --auth-backend postgres
#
# ## Launch the tunnel:
#
# poetry run ./test_runner/websocket_tunnel.py --ws-port 40433 --ws-url "wss://ep-test.local.neon.build"
# poetry run ./test_runner/websocket_tunnel.py --ws-port 40433 --ws-url "wss://ep-test.neon.localtest.me"
#
# ## Now you can connect with psql:
# psql "postgresql://heikki@localhost:40433/postgres"

View File

@@ -1,11 +1,11 @@
{
"v17": [
"17.2",
"8dfd5a7030d3e8a98b60265ebe045788892ac7f3"
"f0ffc8279dbcbbc439981a4fd001a9687e5d665d"
],
"v16": [
"16.6",
"86d9ea96ebb9088eac62f57f1f5ace68e70e0d1c"
"3cf7ce1afab75027716d14223f95ddb300754162"
],
"v15": [
"15.10",