mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-29 11:00:38 +00:00
Compare commits
33 Commits
jcsp/storc
...
arpad/upda
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aee532e4be | ||
|
|
a4ea1e53ae | ||
|
|
c26131c2b3 | ||
|
|
4ab18444ec | ||
|
|
98883e4b30 | ||
|
|
3d143ad799 | ||
|
|
b0c7ee0175 | ||
|
|
8c4e94107d | ||
|
|
c368b0fe14 | ||
|
|
aba61a3712 | ||
|
|
946da3f7e2 | ||
|
|
73633e27ed | ||
|
|
0cf0119751 | ||
|
|
b37f52fdf1 | ||
|
|
443c8d0b4b | ||
|
|
2f36bdb218 | ||
|
|
e7118213ab | ||
|
|
d204d51faf | ||
|
|
ac55e2dbe5 | ||
|
|
874accd6ed | ||
|
|
6cd3b501ec | ||
|
|
bf20d78292 | ||
|
|
2656c713a4 | ||
|
|
5e95860e70 | ||
|
|
0abff59e97 | ||
|
|
9609f7547e | ||
|
|
d6e87a3a9c | ||
|
|
f5243992fa | ||
|
|
95220ba43e | ||
|
|
08f92bb916 | ||
|
|
8f651f9582 | ||
|
|
b5a239c4ae | ||
|
|
de05258419 |
22
.github/workflows/_build-and-test-locally.yml
vendored
22
.github/workflows/_build-and-test-locally.yml
vendored
@@ -20,9 +20,14 @@ on:
|
||||
required: true
|
||||
type: string
|
||||
test-cfg:
|
||||
description: 'a json object of postgres versions and lfc/sanitizers states to build and run regression tests on'
|
||||
description: 'a json object of postgres versions and lfc states to run regression tests on'
|
||||
required: true
|
||||
type: string
|
||||
sanitizers:
|
||||
description: 'enabled or disabled'
|
||||
required: false
|
||||
default: 'disabled'
|
||||
type: string
|
||||
|
||||
defaults:
|
||||
run:
|
||||
@@ -48,8 +53,6 @@ jobs:
|
||||
# io_uring will account the memory of the CQ and SQ as locked.
|
||||
# More details: https://github.com/neondatabase/neon/issues/6373#issuecomment-1905814391
|
||||
options: --init --shm-size=512mb --ulimit memlock=67108864:67108864
|
||||
strategy:
|
||||
matrix: ${{ fromJSON(format('{{"include":{0}}}', inputs.test-cfg)) }}
|
||||
env:
|
||||
BUILD_TYPE: ${{ inputs.build-type }}
|
||||
GIT_VERSION: ${{ github.event.pull_request.head.sha || github.sha }}
|
||||
@@ -89,7 +92,7 @@ jobs:
|
||||
- name: Set env variables
|
||||
env:
|
||||
ARCH: ${{ inputs.arch }}
|
||||
SANITIZERS: ${{ matrix.sanitizers }}
|
||||
SANITIZERS: ${{ inputs.sanitizers }}
|
||||
run: |
|
||||
CARGO_FEATURES="--features testing"
|
||||
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
|
||||
@@ -167,7 +170,7 @@ jobs:
|
||||
|
||||
- name: Run cargo build
|
||||
env:
|
||||
WITH_TESTS: ${{ matrix.sanitizers != 'enabled' && '--tests' || '' }}
|
||||
WITH_TESTS: ${{ inputs.sanitizers != 'enabled' && '--tests' || '' }}
|
||||
run: |
|
||||
export ASAN_OPTIONS=detect_leaks=0
|
||||
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins ${WITH_TESTS}
|
||||
@@ -177,7 +180,7 @@ jobs:
|
||||
- name: Install rust binaries
|
||||
env:
|
||||
ARCH: ${{ inputs.arch }}
|
||||
SANITIZERS: ${{ matrix.sanitizers }}
|
||||
SANITIZERS: ${{ inputs.sanitizers }}
|
||||
run: |
|
||||
# Install target binaries
|
||||
mkdir -p /tmp/neon/bin/
|
||||
@@ -225,7 +228,7 @@ jobs:
|
||||
role-duration-seconds: 18000 # 5 hours
|
||||
|
||||
- name: Run rust tests
|
||||
if: ${{ matrix.sanitizers != 'enabled' }}
|
||||
if: ${{ inputs.sanitizers != 'enabled' }}
|
||||
env:
|
||||
NEXTEST_RETRIES: 3
|
||||
run: |
|
||||
@@ -287,6 +290,7 @@ jobs:
|
||||
DATABASE_URL: postgresql://localhost:1235/storage_controller
|
||||
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
|
||||
run: |
|
||||
export ASAN_OPTIONS=detect_leaks=0
|
||||
/tmp/neon/bin/neon_local init
|
||||
/tmp/neon/bin/neon_local storage_controller start
|
||||
|
||||
@@ -333,7 +337,7 @@ jobs:
|
||||
- name: Pytest regression tests
|
||||
continue-on-error: ${{ matrix.lfc_state == 'with-lfc' && inputs.build-type == 'debug' }}
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
timeout-minutes: ${{ matrix.sanitizers != 'enabled' && 60 || 180 }}
|
||||
timeout-minutes: ${{ inputs.sanitizers != 'enabled' && 60 || 180 }}
|
||||
with:
|
||||
build_type: ${{ inputs.build-type }}
|
||||
test_selection: regress
|
||||
@@ -351,7 +355,7 @@ jobs:
|
||||
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
|
||||
PAGESERVER_GET_VECTORED_CONCURRENT_IO: sidecar-task
|
||||
USE_LFC: ${{ matrix.lfc_state == 'with-lfc' && 'true' || 'false' }}
|
||||
SANITIZERS: ${{ matrix.sanitizers }}
|
||||
SANITIZERS: ${{ inputs.sanitizers }}
|
||||
|
||||
# Temporary disable this step until we figure out why it's so flaky
|
||||
# Ref https://github.com/neondatabase/neon/issues/4540
|
||||
|
||||
@@ -74,7 +74,8 @@ jobs:
|
||||
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
build-tag: ${{ needs.tag.outputs.build-tag }}
|
||||
build-type: ${{ matrix.build-type }}
|
||||
test-cfg: '[{"pg_version":"v17", "sanitizers": "enabled"}]'
|
||||
test-cfg: '[{"pg_version":"v17"}]'
|
||||
sanitizers: enabled
|
||||
secrets: inherit
|
||||
|
||||
|
||||
|
||||
70
Cargo.lock
generated
70
Cargo.lock
generated
@@ -300,9 +300,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
|
||||
|
||||
[[package]]
|
||||
name = "aws-config"
|
||||
version = "1.5.15"
|
||||
version = "1.5.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dc47e70fc35d054c8fcd296d47a61711f043ac80534a10b4f741904f81e73a90"
|
||||
checksum = "c03a50b30228d3af8865ce83376b4e99e1ffa34728220fe2860e4df0bb5278d6"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
@@ -342,9 +342,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-runtime"
|
||||
version = "1.5.4"
|
||||
version = "1.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bee7643696e7fdd74c10f9eb42848a87fe469d35eae9c3323f80aa98f350baac"
|
||||
checksum = "b16d1aa50accc11a4b4d5c50f7fb81cc0cf60328259c587d0e6b0f11385bde46"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-sigv4",
|
||||
@@ -368,9 +368,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-sdk-iam"
|
||||
version = "1.60.0"
|
||||
version = "1.56.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a43daa438f8e7e4ebbbcb5c712b3b85db50d62e637a7da4ba9da51095d327460"
|
||||
checksum = "f2a089a65ceba0f649be19c7b2213d2e009bd7700159e280f03ad5ed2828d30c"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
@@ -391,9 +391,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-sdk-kms"
|
||||
version = "1.58.0"
|
||||
version = "1.54.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "40b7a24700ac548025a47a5c579886f5198895bb1eccd8964dfd71cd66c16912"
|
||||
checksum = "a6cf16c0e5853312995505557b876dd3f9fb9941e96d031383528ccef14ace57"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
@@ -447,9 +447,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-sdk-sso"
|
||||
version = "1.57.0"
|
||||
version = "1.53.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c54bab121fe1881a74c338c5f723d1592bf3b53167f80268a1274f404e1acc38"
|
||||
checksum = "1605dc0bf9f0a4b05b451441a17fcb0bda229db384f23bf5cead3adbab0664ac"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
@@ -469,9 +469,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-sdk-ssooidc"
|
||||
version = "1.58.0"
|
||||
version = "1.54.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8c8234fd024f7ac61c4e44ea008029bde934250f371efe7d4a39708397b1080c"
|
||||
checksum = "59f3f73466ff24f6ad109095e0f3f2c830bfb4cd6c8b12f744c8e61ebf4d3ba1"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
@@ -491,9 +491,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-sdk-sts"
|
||||
version = "1.58.0"
|
||||
version = "1.54.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ba60e1d519d6f23a9df712c04fdeadd7872ac911c84b2f62a8bda92e129b7962"
|
||||
checksum = "249b2acaa8e02fd4718705a9494e3eb633637139aa4bb09d70965b0448e865db"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
@@ -514,9 +514,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-sigv4"
|
||||
version = "1.2.8"
|
||||
version = "1.2.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0bc5bbd1e4a2648fd8c5982af03935972c24a2f9846b396de661d351ee3ce837"
|
||||
checksum = "7d3820e0c08d0737872ff3c7c1f21ebbb6693d832312d6152bf18ef50a5471c2"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-smithy-eventstream",
|
||||
@@ -543,9 +543,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-smithy-async"
|
||||
version = "1.2.4"
|
||||
version = "1.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fa59d1327d8b5053c54bf2eaae63bf629ba9e904434d0835a28ed3c0ed0a614e"
|
||||
checksum = "427cb637d15d63d6f9aae26358e1c9a9c09d5aa490d64b09354c8217cfef0f28"
|
||||
dependencies = [
|
||||
"futures-util",
|
||||
"pin-project-lite",
|
||||
@@ -575,9 +575,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-smithy-eventstream"
|
||||
version = "0.60.6"
|
||||
version = "0.60.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b18559a41e0c909b77625adf2b8c50de480a8041e5e4a3f5f7d177db70abc5a"
|
||||
checksum = "cef7d0a272725f87e51ba2bf89f8c21e4df61b9e49ae1ac367a6d69916ef7c90"
|
||||
dependencies = [
|
||||
"aws-smithy-types",
|
||||
"bytes",
|
||||
@@ -586,9 +586,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-smithy-http"
|
||||
version = "0.60.12"
|
||||
version = "0.60.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7809c27ad8da6a6a68c454e651d4962479e81472aa19ae99e59f9aba1f9713cc"
|
||||
checksum = "5c8bc3e8fdc6b8d07d976e301c02fe553f72a39b7a9fea820e023268467d7ab6"
|
||||
dependencies = [
|
||||
"aws-smithy-eventstream",
|
||||
"aws-smithy-runtime-api",
|
||||
@@ -607,9 +607,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-smithy-json"
|
||||
version = "0.61.2"
|
||||
version = "0.61.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "623a51127f24c30776c8b374295f2df78d92517386f77ba30773f15a30ce1422"
|
||||
checksum = "ee4e69cc50921eb913c6b662f8d909131bb3e6ad6cb6090d3a39b66fc5c52095"
|
||||
dependencies = [
|
||||
"aws-smithy-types",
|
||||
]
|
||||
@@ -626,9 +626,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-smithy-runtime"
|
||||
version = "1.7.7"
|
||||
version = "1.7.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "865f7050bbc7107a6c98a397a9fcd9413690c27fa718446967cf03b2d3ac517e"
|
||||
checksum = "a05dd41a70fc74051758ee75b5c4db2c0ca070ed9229c3df50e9475cda1cb985"
|
||||
dependencies = [
|
||||
"aws-smithy-async",
|
||||
"aws-smithy-http",
|
||||
@@ -670,9 +670,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-smithy-types"
|
||||
version = "1.2.13"
|
||||
version = "1.2.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c7b8a53819e42f10d0821f56da995e1470b199686a1809168db6ca485665f042"
|
||||
checksum = "38ddc9bd6c28aeb303477170ddd183760a956a03e083b3902a990238a7e3792d"
|
||||
dependencies = [
|
||||
"base64-simd",
|
||||
"bytes",
|
||||
@@ -705,9 +705,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-types"
|
||||
version = "1.3.5"
|
||||
version = "1.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dfbd0a668309ec1f66c0f6bda4840dd6d4796ae26d699ebc266d7cc95c6d040f"
|
||||
checksum = "5221b91b3e441e6675310829fd8984801b772cb1546ef6c0e54dec9f1ac13fef"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-smithy-async",
|
||||
@@ -777,7 +777,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "azure_core"
|
||||
version = "0.21.0"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#66e77bdd87bf87e773acf3b0c84b532c1124367d"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#985db729824be324ed11527e45de722250028d9e"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"base64 0.22.1",
|
||||
@@ -806,7 +806,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "azure_identity"
|
||||
version = "0.21.0"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#66e77bdd87bf87e773acf3b0c84b532c1124367d"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#985db729824be324ed11527e45de722250028d9e"
|
||||
dependencies = [
|
||||
"async-lock",
|
||||
"async-trait",
|
||||
@@ -825,7 +825,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "azure_storage"
|
||||
version = "0.21.0"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#66e77bdd87bf87e773acf3b0c84b532c1124367d"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#985db729824be324ed11527e45de722250028d9e"
|
||||
dependencies = [
|
||||
"RustyXML",
|
||||
"async-lock",
|
||||
@@ -843,7 +843,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "azure_storage_blobs"
|
||||
version = "0.21.0"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#66e77bdd87bf87e773acf3b0c84b532c1124367d"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#985db729824be324ed11527e45de722250028d9e"
|
||||
dependencies = [
|
||||
"RustyXML",
|
||||
"azure_core",
|
||||
@@ -863,7 +863,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "azure_svc_blobstorage"
|
||||
version = "0.21.0"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#66e77bdd87bf87e773acf3b0c84b532c1124367d"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#985db729824be324ed11527e45de722250028d9e"
|
||||
dependencies = [
|
||||
"azure_core",
|
||||
"bytes",
|
||||
|
||||
@@ -1578,7 +1578,15 @@ ENV BUILD_TAG=$BUILD_TAG
|
||||
USER nonroot
|
||||
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
|
||||
COPY --chown=nonroot . .
|
||||
RUN mold -run cargo build --locked --profile release-line-debug-size-lto --bin compute_ctl --bin fast_import --bin local_proxy
|
||||
RUN --mount=type=cache,uid=1000,target=/home/nonroot/.cargo/registry \
|
||||
--mount=type=cache,uid=1000,target=/home/nonroot/.cargo/git \
|
||||
--mount=type=cache,uid=1000,target=/home/nonroot/target \
|
||||
mold -run cargo build --locked --profile release-line-debug-size-lto --bin compute_ctl --bin fast_import --bin local_proxy && \
|
||||
mkdir target-bin && \
|
||||
cp target/release-line-debug-size-lto/compute_ctl \
|
||||
target/release-line-debug-size-lto/fast_import \
|
||||
target/release-line-debug-size-lto/local_proxy \
|
||||
target-bin
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
@@ -1607,7 +1615,7 @@ RUN set -e \
|
||||
&& git clone --recurse-submodules --depth 1 --branch ${PGBOUNCER_TAG} https://github.com/pgbouncer/pgbouncer.git pgbouncer \
|
||||
&& cd pgbouncer \
|
||||
&& ./autogen.sh \
|
||||
&& LDFLAGS=-static ./configure --prefix=/usr/local/pgbouncer --without-openssl \
|
||||
&& ./configure --prefix=/usr/local/pgbouncer --without-openssl \
|
||||
&& make -j $(nproc) dist_man_MANS= \
|
||||
&& make install dist_man_MANS=
|
||||
|
||||
@@ -1641,6 +1649,29 @@ RUN echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 30
|
||||
&& echo "${pgbouncer_exporter_sha256} pgbouncer_exporter" | sha256sum -c -\
|
||||
&& echo "${sql_exporter_sha256} sql_exporter" | sha256sum -c -
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "awscli"
|
||||
#
|
||||
#########################################################################################
|
||||
FROM alpine/curl:${ALPINE_CURL_VERSION} AS awscli
|
||||
ARG TARGETARCH
|
||||
RUN set -ex; \
|
||||
if [ "${TARGETARCH}" = "amd64" ]; then \
|
||||
TARGETARCH_ALT="x86_64"; \
|
||||
CHECKSUM="c9a9df3770a3ff9259cb469b6179e02829687a464e0824d5c32d378820b53a00"; \
|
||||
elif [ "${TARGETARCH}" = "arm64" ]; then \
|
||||
TARGETARCH_ALT="aarch64"; \
|
||||
CHECKSUM="8181730be7891582b38b028112e81b4899ca817e8c616aad807c9e9d1289223a"; \
|
||||
else \
|
||||
echo "Unsupported architecture: ${TARGETARCH}"; exit 1; \
|
||||
fi; \
|
||||
curl --retry 5 -L "https://awscli.amazonaws.com/awscli-exe-linux-${TARGETARCH_ALT}-2.17.5.zip" -o /tmp/awscliv2.zip; \
|
||||
echo "${CHECKSUM} /tmp/awscliv2.zip" | sha256sum -c -; \
|
||||
unzip /tmp/awscliv2.zip -d /tmp/awscliv2; \
|
||||
/tmp/awscliv2/aws/install; \
|
||||
rm -rf /tmp/awscliv2.zip /tmp/awscliv2
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Clean up postgres folder before inclusion
|
||||
@@ -1754,16 +1785,19 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
|
||||
# create folder for file cache
|
||||
mkdir -p -m 777 /neon/cache
|
||||
|
||||
# aws cli is used by fast_import
|
||||
COPY --from=awscli /usr/local/aws-cli /usr/local/aws-cli
|
||||
|
||||
COPY --from=postgres-cleanup-layer --chown=postgres /usr/local/pgsql /usr/local
|
||||
COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl
|
||||
COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/fast_import /usr/local/bin/fast_import
|
||||
COPY --from=compute-tools --chown=postgres /home/nonroot/target-bin/compute_ctl /usr/local/bin/compute_ctl
|
||||
COPY --from=compute-tools --chown=postgres /home/nonroot/target-bin/fast_import /usr/local/bin/fast_import
|
||||
|
||||
# pgbouncer and its config
|
||||
COPY --from=pgbouncer /usr/local/pgbouncer/bin/pgbouncer /usr/local/bin/pgbouncer
|
||||
COPY --chmod=0666 --chown=postgres compute/etc/pgbouncer.ini /etc/pgbouncer.ini
|
||||
|
||||
# local_proxy and its config
|
||||
COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/local_proxy /usr/local/bin/local_proxy
|
||||
COPY --from=compute-tools --chown=postgres /home/nonroot/target-bin/local_proxy /usr/local/bin/local_proxy
|
||||
RUN mkdir -p /etc/local_proxy && chown postgres:postgres /etc/local_proxy
|
||||
|
||||
# Metrics exporter binaries and configuration files
|
||||
@@ -1790,6 +1824,7 @@ RUN mkdir /usr/local/download_extensions && chown -R postgres:postgres /usr/loca
|
||||
# libzstd1 for zstd
|
||||
# libboost* for rdkit
|
||||
# ca-certificates for communicating with s3 by compute_ctl
|
||||
# libevent for pgbouncer
|
||||
|
||||
RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
|
||||
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc
|
||||
@@ -1828,34 +1863,14 @@ RUN apt update && \
|
||||
libxslt1.1 \
|
||||
libzstd1 \
|
||||
libcurl4 \
|
||||
libevent-2.1-7 \
|
||||
locales \
|
||||
procps \
|
||||
ca-certificates \
|
||||
curl \
|
||||
unzip \
|
||||
$VERSION_INSTALLS && \
|
||||
apt clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
|
||||
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
|
||||
|
||||
# aws cli is used by fast_import (curl and unzip above are at this time only used for this installation step)
|
||||
ARG TARGETARCH
|
||||
RUN set -ex; \
|
||||
if [ "${TARGETARCH}" = "amd64" ]; then \
|
||||
TARGETARCH_ALT="x86_64"; \
|
||||
CHECKSUM="c9a9df3770a3ff9259cb469b6179e02829687a464e0824d5c32d378820b53a00"; \
|
||||
elif [ "${TARGETARCH}" = "arm64" ]; then \
|
||||
TARGETARCH_ALT="aarch64"; \
|
||||
CHECKSUM="8181730be7891582b38b028112e81b4899ca817e8c616aad807c9e9d1289223a"; \
|
||||
else \
|
||||
echo "Unsupported architecture: ${TARGETARCH}"; exit 1; \
|
||||
fi; \
|
||||
curl --retry 5 -L "https://awscli.amazonaws.com/awscli-exe-linux-${TARGETARCH_ALT}-2.17.5.zip" -o /tmp/awscliv2.zip; \
|
||||
echo "${CHECKSUM} /tmp/awscliv2.zip" | sha256sum -c -; \
|
||||
unzip /tmp/awscliv2.zip -d /tmp/awscliv2; \
|
||||
/tmp/awscliv2/aws/install; \
|
||||
rm -rf /tmp/awscliv2.zip /tmp/awscliv2; \
|
||||
true
|
||||
|
||||
ENV LANG=en_US.utf8
|
||||
USER postgres
|
||||
ENTRYPOINT ["/usr/local/bin/compute_ctl"]
|
||||
|
||||
@@ -47,7 +47,9 @@ files:
|
||||
# Allow postgres user (which is what compute_ctl runs as) to run /neonvm/bin/resize-swap
|
||||
# and /neonvm/bin/set-disk-quota as root without requiring entering a password (NOPASSWD),
|
||||
# regardless of hostname (ALL)
|
||||
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota
|
||||
#
|
||||
# Also allow it to shut down the VM. The fast_import job does that when it's finished.
|
||||
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff
|
||||
- filename: cgconfig.conf
|
||||
content: |
|
||||
# Configuration for cgroups in VM compute nodes
|
||||
|
||||
@@ -41,6 +41,7 @@ use std::process::exit;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock};
|
||||
use std::time::SystemTime;
|
||||
use std::{thread, time::Duration};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
@@ -85,6 +86,19 @@ fn parse_remote_ext_config(arg: &str) -> Result<String> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate a compute ID if one is not supplied. This exists to keep forward
|
||||
/// compatibility tests working, but will be removed in a future iteration.
|
||||
fn generate_compute_id() -> String {
|
||||
let now = SystemTime::now();
|
||||
|
||||
format!(
|
||||
"compute-{}",
|
||||
now.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs()
|
||||
)
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(rename_all = "kebab-case")]
|
||||
struct Cli {
|
||||
@@ -130,17 +144,26 @@ struct Cli {
|
||||
#[arg(short = 'S', long, group = "spec-path")]
|
||||
pub spec_path: Option<OsString>,
|
||||
|
||||
#[arg(short = 'i', long, group = "compute-id", conflicts_with_all = ["spec", "spec-path"])]
|
||||
pub compute_id: Option<String>,
|
||||
#[arg(short = 'i', long, group = "compute-id", default_value = generate_compute_id())]
|
||||
pub compute_id: String,
|
||||
|
||||
#[arg(short = 'p', long, conflicts_with_all = ["spec", "spec-path"], requires = "compute-id", value_name = "CONTROL_PLANE_API_BASE_URL")]
|
||||
#[arg(short = 'p', long, conflicts_with_all = ["spec", "spec-path"], value_name = "CONTROL_PLANE_API_BASE_URL")]
|
||||
pub control_plane_uri: Option<String>,
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let cli = Cli::parse();
|
||||
|
||||
let build_tag = init()?;
|
||||
// For historical reasons, the main thread that processes the spec and launches postgres
|
||||
// is synchronous, but we always have this tokio runtime available and we "enter" it so
|
||||
// that you can use tokio::spawn() and tokio::runtime::Handle::current().block_on(...)
|
||||
// from all parts of compute_ctl.
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()?;
|
||||
let _rt_guard = runtime.enter();
|
||||
|
||||
let build_tag = runtime.block_on(init())?;
|
||||
|
||||
let scenario = failpoint_support::init();
|
||||
|
||||
@@ -172,8 +195,8 @@ fn main() -> Result<()> {
|
||||
deinit_and_exit(wait_pg_result);
|
||||
}
|
||||
|
||||
fn init() -> Result<String> {
|
||||
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
|
||||
async fn init() -> Result<String> {
|
||||
init_tracing_and_logging(DEFAULT_LOG_LEVEL).await?;
|
||||
|
||||
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
|
||||
thread::spawn(move || {
|
||||
@@ -259,20 +282,11 @@ fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
|
||||
});
|
||||
}
|
||||
|
||||
if cli.compute_id.is_none() {
|
||||
panic!(
|
||||
"compute spec should be provided by one of the following ways: \
|
||||
--spec OR --spec-path OR --control-plane-uri and --compute-id"
|
||||
);
|
||||
};
|
||||
if cli.control_plane_uri.is_none() {
|
||||
panic!("must specify both --control-plane-uri and --compute-id or none");
|
||||
panic!("must specify --control-plane-uri");
|
||||
};
|
||||
|
||||
match get_spec_from_control_plane(
|
||||
cli.control_plane_uri.as_ref().unwrap(),
|
||||
cli.compute_id.as_ref().unwrap(),
|
||||
) {
|
||||
match get_spec_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) {
|
||||
Ok(spec) => Ok(CliSpecParams {
|
||||
spec,
|
||||
live_config_allowed: true,
|
||||
@@ -319,6 +333,7 @@ fn wait_spec(
|
||||
let tokio_conn_conf = tokio_postgres::config::Config::from_str(connstr.as_str())
|
||||
.context("cannot build tokio postgres config from connstr")?;
|
||||
let compute_node = ComputeNode {
|
||||
compute_id: cli.compute_id.clone(),
|
||||
connstr,
|
||||
conn_conf,
|
||||
tokio_conn_conf,
|
||||
@@ -345,8 +360,7 @@ fn wait_spec(
|
||||
|
||||
// Launch http service first, so that we can serve control-plane requests
|
||||
// while configuration is still in progress.
|
||||
let _http_handle =
|
||||
launch_http_server(cli.http_port, &compute).expect("cannot launch http endpoint thread");
|
||||
let _http_handle = launch_http_server(cli.http_port, &compute);
|
||||
|
||||
if !spec_set {
|
||||
// No spec provided, hang waiting for it.
|
||||
@@ -484,21 +498,6 @@ fn start_postgres(
|
||||
use std::env;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
// Note: it seems like you can make a runtime in an inner scope and
|
||||
// if you start a task in it it won't be dropped. However, make it
|
||||
// in the outermost scope just to be safe.
|
||||
let rt = if env::var_os("AUTOSCALING").is_some() {
|
||||
Some(
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(4)
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("failed to create tokio runtime for monitor")
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// This token is used internally by the monitor to clean up all threads
|
||||
let token = CancellationToken::new();
|
||||
|
||||
@@ -509,16 +508,19 @@ fn start_postgres(
|
||||
Some(cli.filecache_connstr.clone())
|
||||
};
|
||||
|
||||
let vm_monitor = rt.as_ref().map(|rt| {
|
||||
rt.spawn(vm_monitor::start(
|
||||
let vm_monitor = if env::var_os("AUTOSCALING").is_some() {
|
||||
let vm_monitor = tokio::spawn(vm_monitor::start(
|
||||
Box::leak(Box::new(vm_monitor::Args {
|
||||
cgroup: Some(cli.cgroup.clone()),
|
||||
pgconnstr,
|
||||
addr: cli.vm_monitor_addr.clone(),
|
||||
})),
|
||||
token.clone(),
|
||||
))
|
||||
});
|
||||
));
|
||||
Some(vm_monitor)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -528,8 +530,6 @@ fn start_postgres(
|
||||
delay_exit,
|
||||
compute,
|
||||
#[cfg(target_os = "linux")]
|
||||
rt,
|
||||
#[cfg(target_os = "linux")]
|
||||
token,
|
||||
#[cfg(target_os = "linux")]
|
||||
vm_monitor,
|
||||
@@ -537,15 +537,13 @@ fn start_postgres(
|
||||
))
|
||||
}
|
||||
|
||||
type PostgresHandle = (std::process::Child, std::thread::JoinHandle<()>);
|
||||
type PostgresHandle = (std::process::Child, tokio::task::JoinHandle<Result<()>>);
|
||||
|
||||
struct StartPostgresResult {
|
||||
delay_exit: bool,
|
||||
// passed through from WaitSpecResult
|
||||
compute: Arc<ComputeNode>,
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
rt: Option<tokio::runtime::Runtime>,
|
||||
#[cfg(target_os = "linux")]
|
||||
token: tokio_util::sync::CancellationToken,
|
||||
#[cfg(target_os = "linux")]
|
||||
@@ -564,10 +562,10 @@ fn wait_postgres(pg: Option<PostgresHandle>) -> Result<WaitPostgresResult> {
|
||||
.expect("failed to start waiting on Postgres process");
|
||||
PG_PID.store(0, Ordering::SeqCst);
|
||||
|
||||
// Process has exited, so we can join the logs thread.
|
||||
let _ = logs_handle
|
||||
.join()
|
||||
.map_err(|e| tracing::error!("log thread panicked: {:?}", e));
|
||||
// Process has exited. Wait for the log collecting task to finish.
|
||||
let _ = tokio::runtime::Handle::current()
|
||||
.block_on(logs_handle)
|
||||
.map_err(|e| tracing::error!("log task panicked: {:?}", e));
|
||||
|
||||
info!("Postgres exited with code {}, shutting down", ecode);
|
||||
exit_code = ecode.code()
|
||||
@@ -588,8 +586,6 @@ fn cleanup_after_postgres_exit(
|
||||
vm_monitor,
|
||||
#[cfg(target_os = "linux")]
|
||||
token,
|
||||
#[cfg(target_os = "linux")]
|
||||
rt,
|
||||
}: StartPostgresResult,
|
||||
) -> Result<bool> {
|
||||
// Terminate the vm_monitor so it releases the file watcher on
|
||||
@@ -602,10 +598,6 @@ fn cleanup_after_postgres_exit(
|
||||
token.cancel();
|
||||
// Kills the actual task running the monitor
|
||||
handle.abort();
|
||||
|
||||
// If handle is some, rt must have been used to produce it, and
|
||||
// hence is also some
|
||||
rt.unwrap().shutdown_timeout(Duration::from_secs(2));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -140,5 +140,34 @@ pub async fn get_database_schema(
|
||||
warn!("pg_dump stderr: {}", line)
|
||||
}
|
||||
});
|
||||
Ok(initial_stream.chain(stdout_reader.map(|res| res.map(|b| b.freeze()))))
|
||||
|
||||
#[allow(dead_code)]
|
||||
struct SchemaStream<S> {
|
||||
// We keep a reference to the child process to ensure it stays alive
|
||||
// while the stream is being consumed. When SchemaStream is dropped,
|
||||
// cmd will be dropped, which triggers kill_on_drop and terminates pg_dump
|
||||
cmd: tokio::process::Child,
|
||||
stream: S,
|
||||
}
|
||||
|
||||
impl<S> Stream for SchemaStream<S>
|
||||
where
|
||||
S: Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin,
|
||||
{
|
||||
type Item = Result<bytes::Bytes, std::io::Error>;
|
||||
|
||||
fn poll_next(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Option<Self::Item>> {
|
||||
Stream::poll_next(std::pin::Pin::new(&mut self.stream), cx)
|
||||
}
|
||||
}
|
||||
|
||||
let schema_stream = SchemaStream {
|
||||
cmd,
|
||||
stream: initial_stream.chain(stdout_reader.map(|res| res.map(|b| b.freeze()))),
|
||||
};
|
||||
|
||||
Ok(schema_stream)
|
||||
}
|
||||
|
||||
@@ -9,7 +9,6 @@ use std::str::FromStr;
|
||||
use std::sync::atomic::AtomicU32;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::{Arc, Condvar, Mutex, RwLock};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
@@ -59,6 +58,8 @@ pub static PG_PID: AtomicU32 = AtomicU32::new(0);
|
||||
|
||||
/// Compute node info shared across several `compute_ctl` threads.
|
||||
pub struct ComputeNode {
|
||||
/// The ID of the compute
|
||||
pub compute_id: String,
|
||||
// Url type maintains proper escaping
|
||||
pub connstr: url::Url,
|
||||
// We connect to Postgres from many different places, so build configs once
|
||||
@@ -546,11 +547,7 @@ impl ComputeNode {
|
||||
pub fn check_safekeepers_synced(&self, compute_state: &ComputeState) -> Result<Option<Lsn>> {
|
||||
let start_time = Utc::now();
|
||||
|
||||
// Run actual work with new tokio runtime
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("failed to create rt");
|
||||
let rt = tokio::runtime::Handle::current();
|
||||
let result = rt.block_on(self.check_safekeepers_synced_async(compute_state));
|
||||
|
||||
// Record runtime
|
||||
@@ -597,9 +594,9 @@ impl ComputeNode {
|
||||
SYNC_SAFEKEEPERS_PID.store(0, Ordering::SeqCst);
|
||||
|
||||
// Process has exited, so we can join the logs thread.
|
||||
let _ = logs_handle
|
||||
.join()
|
||||
.map_err(|e| tracing::error!("log thread panicked: {:?}", e));
|
||||
let _ = tokio::runtime::Handle::current()
|
||||
.block_on(logs_handle)
|
||||
.map_err(|e| tracing::error!("log task panicked: {:?}", e));
|
||||
|
||||
if !sync_output.status.success() {
|
||||
anyhow::bail!(
|
||||
@@ -784,7 +781,7 @@ impl ComputeNode {
|
||||
pub fn start_postgres(
|
||||
&self,
|
||||
storage_auth_token: Option<String>,
|
||||
) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> {
|
||||
) -> Result<(std::process::Child, tokio::task::JoinHandle<Result<()>>)> {
|
||||
let pgdata_path = Path::new(&self.pgdata);
|
||||
|
||||
// Run postgres as a child process.
|
||||
@@ -800,7 +797,7 @@ impl ComputeNode {
|
||||
.expect("cannot start postgres process");
|
||||
PG_PID.store(pg.id(), Ordering::SeqCst);
|
||||
|
||||
// Start a thread to collect logs from stderr.
|
||||
// Start a task to collect logs from stderr.
|
||||
let stderr = pg.stderr.take().expect("stderr should be captured");
|
||||
let logs_handle = handle_postgres_logs(stderr);
|
||||
|
||||
@@ -809,20 +806,28 @@ impl ComputeNode {
|
||||
Ok((pg, logs_handle))
|
||||
}
|
||||
|
||||
/// Do post configuration of the already started Postgres. This function spawns a background thread to
|
||||
/// Do post configuration of the already started Postgres. This function spawns a background task to
|
||||
/// configure the database after applying the compute spec. Currently, it upgrades the neon extension
|
||||
/// version. In the future, it may upgrade all 3rd-party extensions.
|
||||
#[instrument(skip_all)]
|
||||
pub fn post_apply_config(&self) -> Result<()> {
|
||||
let conf = self.get_conn_conf(Some("compute_ctl:post_apply_config"));
|
||||
thread::spawn(move || {
|
||||
let func = || {
|
||||
let mut client = conf.connect(NoTls)?;
|
||||
let conf = self.get_tokio_conn_conf(Some("compute_ctl:post_apply_config"));
|
||||
tokio::spawn(async move {
|
||||
let res = async {
|
||||
let (mut client, connection) = conf.connect(NoTls).await?;
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
eprintln!("connection error: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
handle_neon_extension_upgrade(&mut client)
|
||||
.await
|
||||
.context("handle_neon_extension_upgrade")?;
|
||||
Ok::<_, anyhow::Error>(())
|
||||
};
|
||||
if let Err(err) = func() {
|
||||
}
|
||||
.await;
|
||||
if let Err(err) = res {
|
||||
error!("error while post_apply_config: {err:#}");
|
||||
}
|
||||
});
|
||||
@@ -919,13 +924,10 @@ impl ComputeNode {
|
||||
conf: Arc<tokio_postgres::Config>,
|
||||
concurrency: usize,
|
||||
) -> Result<()> {
|
||||
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()?;
|
||||
|
||||
info!("Applying config with max {} concurrency", concurrency);
|
||||
debug!("Config: {:?}", spec);
|
||||
|
||||
let rt = tokio::runtime::Handle::current();
|
||||
rt.block_on(async {
|
||||
// Proceed with post-startup configuration. Note, that order of operations is important.
|
||||
let client = Self::get_maintenance_client(&conf).await?;
|
||||
@@ -1319,14 +1321,18 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
// Run migrations separately to not hold up cold starts
|
||||
thread::spawn(move || {
|
||||
let conf = conf.as_ref().clone();
|
||||
let mut conf = postgres::config::Config::from(conf);
|
||||
tokio::spawn(async move {
|
||||
let mut conf = conf.as_ref().clone();
|
||||
conf.application_name("compute_ctl:migrations");
|
||||
|
||||
match conf.connect(NoTls) {
|
||||
Ok(mut client) => {
|
||||
if let Err(e) = handle_migrations(&mut client) {
|
||||
match conf.connect(NoTls).await {
|
||||
Ok((mut client, connection)) => {
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
eprintln!("connection error: {}", e);
|
||||
}
|
||||
});
|
||||
if let Err(e) = handle_migrations(&mut client).await {
|
||||
error!("Failed to run migrations: {}", e);
|
||||
}
|
||||
}
|
||||
@@ -1363,16 +1369,11 @@ impl ComputeNode {
|
||||
if let Some(ref pgbouncer_settings) = spec.pgbouncer_settings {
|
||||
info!("tuning pgbouncer");
|
||||
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("failed to create rt");
|
||||
|
||||
// Spawn a thread to do the tuning,
|
||||
// Spawn a background task to do the tuning,
|
||||
// so that we don't block the main thread that starts Postgres.
|
||||
let pgbouncer_settings = pgbouncer_settings.clone();
|
||||
let _handle = thread::spawn(move || {
|
||||
let res = rt.block_on(tune_pgbouncer(pgbouncer_settings));
|
||||
tokio::spawn(async move {
|
||||
let res = tune_pgbouncer(pgbouncer_settings).await;
|
||||
if let Err(err) = res {
|
||||
error!("error while tuning pgbouncer: {err:?}");
|
||||
}
|
||||
@@ -1382,14 +1383,14 @@ impl ComputeNode {
|
||||
if let Some(ref local_proxy) = spec.local_proxy_config {
|
||||
info!("configuring local_proxy");
|
||||
|
||||
// Spawn a thread to do the configuration,
|
||||
// Spawn a background task to do the configuration,
|
||||
// so that we don't block the main thread that starts Postgres.
|
||||
let local_proxy = local_proxy.clone();
|
||||
let _handle = Some(thread::spawn(move || {
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = local_proxy::configure(&local_proxy) {
|
||||
error!("error while configuring local_proxy: {err:?}");
|
||||
}
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
// Write new config
|
||||
@@ -1431,7 +1432,9 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub fn start_compute(&self) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> {
|
||||
pub fn start_compute(
|
||||
&self,
|
||||
) -> Result<(std::process::Child, tokio::task::JoinHandle<Result<()>>)> {
|
||||
let compute_state = self.state.lock().unwrap().clone();
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
info!(
|
||||
@@ -1446,16 +1449,11 @@ impl ComputeNode {
|
||||
if let Some(pgbouncer_settings) = &pspec.spec.pgbouncer_settings {
|
||||
info!("tuning pgbouncer");
|
||||
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("failed to create rt");
|
||||
|
||||
// Spawn a thread to do the tuning,
|
||||
// Spawn a background task to do the tuning,
|
||||
// so that we don't block the main thread that starts Postgres.
|
||||
let pgbouncer_settings = pgbouncer_settings.clone();
|
||||
let _handle = thread::spawn(move || {
|
||||
let res = rt.block_on(tune_pgbouncer(pgbouncer_settings));
|
||||
let _handle = tokio::spawn(async move {
|
||||
let res = tune_pgbouncer(pgbouncer_settings).await;
|
||||
if let Err(err) = res {
|
||||
error!("error while tuning pgbouncer: {err:?}");
|
||||
}
|
||||
@@ -1465,10 +1463,10 @@ impl ComputeNode {
|
||||
if let Some(local_proxy) = &pspec.spec.local_proxy_config {
|
||||
info!("configuring local_proxy");
|
||||
|
||||
// Spawn a thread to do the configuration,
|
||||
// Spawn a background task to do the configuration,
|
||||
// so that we don't block the main thread that starts Postgres.
|
||||
let local_proxy = local_proxy.clone();
|
||||
let _handle = thread::spawn(move || {
|
||||
let _handle = tokio::spawn(async move {
|
||||
if let Err(err) = local_proxy::configure(&local_proxy) {
|
||||
error!("error while configuring local_proxy: {err:?}");
|
||||
}
|
||||
@@ -1487,7 +1485,8 @@ impl ComputeNode {
|
||||
extension_server::create_control_files(remote_extensions, &self.pgbin);
|
||||
|
||||
let library_load_start_time = Utc::now();
|
||||
let remote_ext_metrics = self.prepare_preload_libraries(&pspec.spec)?;
|
||||
let rt = tokio::runtime::Handle::current();
|
||||
let remote_ext_metrics = rt.block_on(self.prepare_preload_libraries(&pspec.spec))?;
|
||||
|
||||
let library_load_time = Utc::now()
|
||||
.signed_duration_since(library_load_start_time)
|
||||
@@ -1542,7 +1541,7 @@ impl ComputeNode {
|
||||
self.post_apply_config()?;
|
||||
|
||||
let conf = self.get_conn_conf(None);
|
||||
thread::spawn(move || {
|
||||
tokio::task::spawn_blocking(|| {
|
||||
let res = get_installed_extensions(conf);
|
||||
match res {
|
||||
Ok(extensions) => {
|
||||
@@ -1891,7 +1890,6 @@ LIMIT 100",
|
||||
Ok(ext_version)
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
pub async fn prepare_preload_libraries(
|
||||
&self,
|
||||
spec: &ComputeSpec,
|
||||
|
||||
@@ -51,9 +51,12 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
|
||||
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
|
||||
let compute = Arc::clone(compute);
|
||||
|
||||
let runtime = tokio::runtime::Handle::current();
|
||||
|
||||
thread::Builder::new()
|
||||
.name("compute-configurator".into())
|
||||
.spawn(move || {
|
||||
let _rt_guard = runtime.enter();
|
||||
configurator_main_loop(&compute);
|
||||
info!("configurator thread is exited");
|
||||
})
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
use std::{
|
||||
net::{IpAddr, Ipv6Addr, SocketAddr},
|
||||
sync::Arc,
|
||||
thread,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use anyhow::Result;
|
||||
use axum::{
|
||||
extract::Request,
|
||||
middleware::{self, Next},
|
||||
@@ -46,7 +44,6 @@ async fn maybe_add_request_id_header(mut request: Request, next: Next) -> Respon
|
||||
}
|
||||
|
||||
/// Run the HTTP server and wait on it forever.
|
||||
#[tokio::main]
|
||||
async fn serve(port: u16, compute: Arc<ComputeNode>) {
|
||||
let mut app = Router::new()
|
||||
.route("/check_writability", post(check_writability::is_writable))
|
||||
@@ -139,11 +136,9 @@ async fn serve(port: u16, compute: Arc<ComputeNode>) {
|
||||
}
|
||||
}
|
||||
|
||||
/// Launch a separate HTTP server thread and return its `JoinHandle`.
|
||||
pub fn launch_http_server(port: u16, state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
|
||||
/// Launch HTTP server in a new task and return its `JoinHandle`.
|
||||
pub fn launch_http_server(port: u16, state: &Arc<ComputeNode>) -> tokio::task::JoinHandle<()> {
|
||||
let state = Arc::clone(state);
|
||||
|
||||
Ok(thread::Builder::new()
|
||||
.name("http-server".into())
|
||||
.spawn(move || serve(port, state))?)
|
||||
tokio::spawn(serve(port, state))
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ use tracing_subscriber::prelude::*;
|
||||
/// set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`. See
|
||||
/// `tracing-utils` package description.
|
||||
///
|
||||
pub fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result<()> {
|
||||
pub async fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result<()> {
|
||||
// Initialize Logging
|
||||
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_log_level));
|
||||
@@ -22,7 +22,7 @@ pub fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result<()> {
|
||||
.with_writer(std::io::stderr);
|
||||
|
||||
// Initialize OpenTelemetry
|
||||
let otlp_layer = tracing_utils::init_tracing_without_runtime("compute_ctl");
|
||||
let otlp_layer = tracing_utils::init_tracing("compute_ctl").await;
|
||||
|
||||
// Put it all together
|
||||
tracing_subscriber::registry()
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use anyhow::{Context, Result};
|
||||
use fail::fail_point;
|
||||
use postgres::{Client, Transaction};
|
||||
use tokio_postgres::{Client, Transaction};
|
||||
use tracing::{error, info};
|
||||
|
||||
use crate::metrics::DB_MIGRATION_FAILED;
|
||||
@@ -21,10 +21,11 @@ impl<'m> MigrationRunner<'m> {
|
||||
}
|
||||
|
||||
/// Get the current value neon_migration.migration_id
|
||||
fn get_migration_id(&mut self) -> Result<i64> {
|
||||
async fn get_migration_id(&mut self) -> Result<i64> {
|
||||
let row = self
|
||||
.client
|
||||
.query_one("SELECT id FROM neon_migration.migration_id", &[])?;
|
||||
.query_one("SELECT id FROM neon_migration.migration_id", &[])
|
||||
.await?;
|
||||
|
||||
Ok(row.get::<&str, i64>("id"))
|
||||
}
|
||||
@@ -34,7 +35,7 @@ impl<'m> MigrationRunner<'m> {
|
||||
/// This function has a fail point called compute-migration, which can be
|
||||
/// used if you would like to fail the application of a series of migrations
|
||||
/// at some point.
|
||||
fn update_migration_id(txn: &mut Transaction, migration_id: i64) -> Result<()> {
|
||||
async fn update_migration_id(txn: &mut Transaction<'_>, migration_id: i64) -> Result<()> {
|
||||
// We use this fail point in order to check that failing in the
|
||||
// middle of applying a series of migrations fails in an expected
|
||||
// manner
|
||||
@@ -59,31 +60,38 @@ impl<'m> MigrationRunner<'m> {
|
||||
"UPDATE neon_migration.migration_id SET id = $1",
|
||||
&[&migration_id],
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("update neon_migration.migration_id to {migration_id}"))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Prepare the migrations the target database for handling migrations
|
||||
fn prepare_database(&mut self) -> Result<()> {
|
||||
async fn prepare_database(&mut self) -> Result<()> {
|
||||
self.client
|
||||
.simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")?;
|
||||
self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)")?;
|
||||
self.client.simple_query(
|
||||
"INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
|
||||
)?;
|
||||
.simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")
|
||||
.await?;
|
||||
self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)").await?;
|
||||
self.client
|
||||
.simple_query("ALTER SCHEMA neon_migration OWNER TO cloud_admin")?;
|
||||
.simple_query(
|
||||
"INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
|
||||
)
|
||||
.await?;
|
||||
self.client
|
||||
.simple_query("REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC")?;
|
||||
.simple_query("ALTER SCHEMA neon_migration OWNER TO cloud_admin")
|
||||
.await?;
|
||||
self.client
|
||||
.simple_query("REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC")
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Run an individual migration in a separate transaction block.
|
||||
fn run_migration(client: &mut Client, migration_id: i64, migration: &str) -> Result<()> {
|
||||
async fn run_migration(client: &mut Client, migration_id: i64, migration: &str) -> Result<()> {
|
||||
let mut txn = client
|
||||
.transaction()
|
||||
.await
|
||||
.with_context(|| format!("begin transaction for migration {migration_id}"))?;
|
||||
|
||||
if migration.starts_with("-- SKIP") {
|
||||
@@ -92,35 +100,38 @@ impl<'m> MigrationRunner<'m> {
|
||||
// Even though we are skipping the migration, updating the
|
||||
// migration ID should help keep logic easy to understand when
|
||||
// trying to understand the state of a cluster.
|
||||
Self::update_migration_id(&mut txn, migration_id)?;
|
||||
Self::update_migration_id(&mut txn, migration_id).await?;
|
||||
} else {
|
||||
info!("Running migration id={}:\n{}\n", migration_id, migration);
|
||||
|
||||
txn.simple_query(migration)
|
||||
.await
|
||||
.with_context(|| format!("apply migration {migration_id}"))?;
|
||||
|
||||
Self::update_migration_id(&mut txn, migration_id)?;
|
||||
Self::update_migration_id(&mut txn, migration_id).await?;
|
||||
}
|
||||
|
||||
txn.commit()
|
||||
.await
|
||||
.with_context(|| format!("commit transaction for migration {migration_id}"))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Run the configured set of migrations
|
||||
pub fn run_migrations(mut self) -> Result<()> {
|
||||
pub async fn run_migrations(mut self) -> Result<()> {
|
||||
self.prepare_database()
|
||||
.await
|
||||
.context("prepare database to handle migrations")?;
|
||||
|
||||
let mut current_migration = self.get_migration_id()? as usize;
|
||||
let mut current_migration = self.get_migration_id().await? as usize;
|
||||
while current_migration < self.migrations.len() {
|
||||
// The index lags the migration ID by 1, so the current migration
|
||||
// ID is also the next index
|
||||
let migration_id = (current_migration + 1) as i64;
|
||||
let migration = self.migrations[current_migration];
|
||||
|
||||
match Self::run_migration(self.client, migration_id, migration) {
|
||||
match Self::run_migration(self.client, migration_id, migration).await {
|
||||
Ok(_) => {
|
||||
info!("Finished migration id={}", migration_id);
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ use std::os::unix::fs::PermissionsExt;
|
||||
use std::path::Path;
|
||||
use std::process::Child;
|
||||
use std::str::FromStr;
|
||||
use std::thread::JoinHandle;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::{bail, Result};
|
||||
@@ -16,6 +15,7 @@ use ini::Ini;
|
||||
use notify::{RecursiveMode, Watcher};
|
||||
use postgres::config::Config;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::timeout;
|
||||
use tokio_postgres;
|
||||
use tokio_postgres::NoTls;
|
||||
@@ -477,23 +477,13 @@ pub async fn tune_pgbouncer(pgbouncer_config: HashMap<String, String>) -> Result
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Spawn a thread that will read Postgres logs from `stderr`, join multiline logs
|
||||
/// Spawn a task that will read Postgres logs from `stderr`, join multiline logs
|
||||
/// and send them to the logger. In the future we may also want to add context to
|
||||
/// these logs.
|
||||
pub fn handle_postgres_logs(stderr: std::process::ChildStderr) -> JoinHandle<()> {
|
||||
std::thread::spawn(move || {
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("failed to build tokio runtime");
|
||||
|
||||
let res = runtime.block_on(async move {
|
||||
let stderr = tokio::process::ChildStderr::from_std(stderr)?;
|
||||
handle_postgres_logs_async(stderr).await
|
||||
});
|
||||
if let Err(e) = res {
|
||||
tracing::error!("error while processing postgres logs: {}", e);
|
||||
}
|
||||
pub fn handle_postgres_logs(stderr: std::process::ChildStderr) -> JoinHandle<Result<()>> {
|
||||
tokio::spawn(async move {
|
||||
let stderr = tokio::process::ChildStderr::from_std(stderr)?;
|
||||
handle_postgres_logs_async(stderr).await
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use postgres::Client;
|
||||
use reqwest::StatusCode;
|
||||
use std::fs::File;
|
||||
use std::path::Path;
|
||||
use tokio_postgres::Client;
|
||||
use tracing::{error, info, instrument, warn};
|
||||
|
||||
use crate::config;
|
||||
@@ -166,17 +166,17 @@ pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
|
||||
pub async fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
|
||||
info!("handle neon extension upgrade");
|
||||
let query = "ALTER EXTENSION neon UPDATE";
|
||||
info!("update neon extension version with query: {}", query);
|
||||
client.simple_query(query)?;
|
||||
client.simple_query(query).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub fn handle_migrations(client: &mut Client) -> Result<()> {
|
||||
pub async fn handle_migrations(client: &mut Client) -> Result<()> {
|
||||
info!("handle migrations");
|
||||
|
||||
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
@@ -206,7 +206,9 @@ pub fn handle_migrations(client: &mut Client) -> Result<()> {
|
||||
),
|
||||
];
|
||||
|
||||
MigrationRunner::new(client, &migrations).run_migrations()?;
|
||||
MigrationRunner::new(client, &migrations)
|
||||
.run_migrations()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -214,7 +216,7 @@ pub fn handle_migrations(client: &mut Client) -> Result<()> {
|
||||
/// Connect to the database as superuser and pre-create anon extension
|
||||
/// if it is present in shared_preload_libraries
|
||||
#[instrument(skip_all)]
|
||||
pub fn handle_extension_anon(
|
||||
pub async fn handle_extension_anon(
|
||||
spec: &ComputeSpec,
|
||||
db_owner: &str,
|
||||
db_client: &mut Client,
|
||||
@@ -227,7 +229,7 @@ pub fn handle_extension_anon(
|
||||
if !grants_only {
|
||||
// check if extension is already initialized using anon.is_initialized()
|
||||
let query = "SELECT anon.is_initialized()";
|
||||
match db_client.query(query, &[]) {
|
||||
match db_client.query(query, &[]).await {
|
||||
Ok(rows) => {
|
||||
if !rows.is_empty() {
|
||||
let is_initialized: bool = rows[0].get(0);
|
||||
@@ -249,7 +251,7 @@ pub fn handle_extension_anon(
|
||||
// Users cannot create it themselves, because superuser is required.
|
||||
let mut query = "CREATE EXTENSION IF NOT EXISTS anon CASCADE";
|
||||
info!("creating anon extension with query: {}", query);
|
||||
match db_client.query(query, &[]) {
|
||||
match db_client.query(query, &[]).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("anon extension creation failed with error: {}", e);
|
||||
@@ -259,7 +261,7 @@ pub fn handle_extension_anon(
|
||||
|
||||
// check that extension is installed
|
||||
query = "SELECT extname FROM pg_extension WHERE extname = 'anon'";
|
||||
let rows = db_client.query(query, &[])?;
|
||||
let rows = db_client.query(query, &[]).await?;
|
||||
if rows.is_empty() {
|
||||
error!("anon extension is not installed");
|
||||
return Ok(());
|
||||
@@ -268,7 +270,7 @@ pub fn handle_extension_anon(
|
||||
// Initialize anon extension
|
||||
// This also requires superuser privileges, so users cannot do it themselves.
|
||||
query = "SELECT anon.init()";
|
||||
match db_client.query(query, &[]) {
|
||||
match db_client.query(query, &[]).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("anon.init() failed with error: {}", e);
|
||||
@@ -279,7 +281,7 @@ pub fn handle_extension_anon(
|
||||
|
||||
// check that extension is installed, if not bail early
|
||||
let query = "SELECT extname FROM pg_extension WHERE extname = 'anon'";
|
||||
match db_client.query(query, &[]) {
|
||||
match db_client.query(query, &[]).await {
|
||||
Ok(rows) => {
|
||||
if rows.is_empty() {
|
||||
error!("anon extension is not installed");
|
||||
@@ -294,12 +296,12 @@ pub fn handle_extension_anon(
|
||||
|
||||
let query = format!("GRANT ALL ON SCHEMA anon TO {}", db_owner);
|
||||
info!("granting anon extension permissions with query: {}", query);
|
||||
db_client.simple_query(&query)?;
|
||||
db_client.simple_query(&query).await?;
|
||||
|
||||
// Grant permissions to db_owner to use anon extension functions
|
||||
let query = format!("GRANT ALL ON ALL FUNCTIONS IN SCHEMA anon TO {}", db_owner);
|
||||
info!("granting anon extension permissions with query: {}", query);
|
||||
db_client.simple_query(&query)?;
|
||||
db_client.simple_query(&query).await?;
|
||||
|
||||
// This is needed, because some functions are defined as SECURITY DEFINER.
|
||||
// In Postgres SECURITY DEFINER functions are executed with the privileges
|
||||
@@ -314,16 +316,16 @@ pub fn handle_extension_anon(
|
||||
where nsp.nspname = 'anon';", db_owner);
|
||||
|
||||
info!("change anon extension functions owner to db owner");
|
||||
db_client.simple_query(&query)?;
|
||||
db_client.simple_query(&query).await?;
|
||||
|
||||
// affects views as well
|
||||
let query = format!("GRANT ALL ON ALL TABLES IN SCHEMA anon TO {}", db_owner);
|
||||
info!("granting anon extension permissions with query: {}", query);
|
||||
db_client.simple_query(&query)?;
|
||||
db_client.simple_query(&query).await?;
|
||||
|
||||
let query = format!("GRANT ALL ON ALL SEQUENCES IN SCHEMA anon TO {}", db_owner);
|
||||
info!("granting anon extension permissions with query: {}", query);
|
||||
db_client.simple_query(&query)?;
|
||||
db_client.simple_query(&query).await?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -665,6 +665,22 @@ impl Endpoint {
|
||||
.to_str()
|
||||
.unwrap(),
|
||||
])
|
||||
// TODO: It would be nice if we generated compute IDs with the same
|
||||
// algorithm as the real control plane.
|
||||
//
|
||||
// TODO: Add this back when
|
||||
// https://github.com/neondatabase/neon/pull/10747 is merged.
|
||||
//
|
||||
//.args([
|
||||
// "--compute-id",
|
||||
// &format!(
|
||||
// "compute-{}",
|
||||
// SystemTime::now()
|
||||
// .duration_since(UNIX_EPOCH)
|
||||
// .unwrap()
|
||||
// .as_secs()
|
||||
// ),
|
||||
//])
|
||||
.stdin(std::process::Stdio::null())
|
||||
.stderr(logfile.try_clone()?)
|
||||
.stdout(logfile);
|
||||
|
||||
@@ -2,4 +2,4 @@
|
||||
set -ex
|
||||
cd "$(dirname ${0})"
|
||||
patch -p1 <test-upgrade.patch
|
||||
pg_prove test.sql
|
||||
pg_prove -d contrib_regression test.sql
|
||||
@@ -285,10 +285,10 @@ To summarize, list of cplane changes:
|
||||
|
||||
### storage_controller implementation
|
||||
|
||||
Current 'load everything on startup and keep in memory' easy design is fine.
|
||||
Single timeline shouldn't take more than 100 bytes (it's 16 byte tenant_id, 16
|
||||
byte timeline_id, int generation, vec of ~3 safekeeper ids plus some flags), so
|
||||
10^6 of timelines shouldn't take more than 100MB.
|
||||
If desired, we may continue using current 'load everything on startup and keep
|
||||
in memory' approach: single timeline shouldn't take more than 100 bytes (it's 16
|
||||
byte tenant_id, 16 byte timeline_id, int generation, vec of ~3 safekeeper ids
|
||||
plus some flags), so 10^6 of timelines shouldn't take more than 100MB.
|
||||
|
||||
Similar to pageserver attachment Intents storage_controller would have in-memory
|
||||
`MigrationRequest` (or its absense) for each timeline and pool of tasks trying
|
||||
@@ -296,7 +296,7 @@ to make these request reality; this ensures one instance of storage_controller
|
||||
won't do several migrations on the same timeline concurrently. In the first
|
||||
version it is simpler to have more manual control and no retries, i.e. migration
|
||||
failure removes the request. Later we can build retries and automatic
|
||||
scheduling/migration. `MigrationRequest` is
|
||||
scheduling/migration around. `MigrationRequest` is
|
||||
```
|
||||
enum MigrationRequest {
|
||||
To(Vec<NodeId>),
|
||||
@@ -313,9 +313,9 @@ similarly, in the first version it is ok to trigger it manually).
|
||||
#### Schema
|
||||
|
||||
`safekeepers` table mirroring current `nodes` should be added, except that for
|
||||
`scheduling_policy` field (seems like `status` is a better name for it): it is enough
|
||||
to have at least in the beginning only 3 fields: 1) `active` 2) `offline` 3)
|
||||
`decomissioned`.
|
||||
`scheduling_policy`: it is enough to have at least in the beginning only 3
|
||||
fields: 1) `active` 2) `paused` (initially means only not assign new tlis there
|
||||
3) `decomissioned` (node is removed).
|
||||
|
||||
`timelines` table:
|
||||
```
|
||||
@@ -324,18 +324,24 @@ table! {
|
||||
timelines (tenant_id, timeline_id) {
|
||||
timeline_id -> Varchar,
|
||||
tenant_id -> Varchar,
|
||||
start_lsn -> pg_lsn,
|
||||
generation -> Int4,
|
||||
sk_set -> Array<Int4>, // list of safekeeper ids
|
||||
new_sk_set -> Nullable<Array<Int4>>, // list of safekeeper ids, null if not joint conf
|
||||
new_sk_set -> Nullable<Array<Int8>>, // list of safekeeper ids, null if not joint conf
|
||||
cplane_notified_generation -> Int4,
|
||||
deleted_at -> Nullable<Timestamptz>,
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
`start_lsn` is needed to create timeline on safekeepers properly, see below. We
|
||||
might also want to add ancestor_timeline_id to preserve the hierarchy, but for
|
||||
this RFC it is not needed.
|
||||
|
||||
#### API
|
||||
|
||||
Node management is similar to pageserver:
|
||||
1) POST `/control/v1/safekeepers` upserts safekeeper.
|
||||
1) POST `/control/v1/safekeepers` inserts safekeeper.
|
||||
2) GET `/control/v1/safekeepers` lists safekeepers.
|
||||
3) GET `/control/v1/safekeepers/:node_id` gets safekeeper.
|
||||
4) PUT `/control/v1/safekepers/:node_id/status` changes status to e.g.
|
||||
@@ -345,25 +351,15 @@ Node management is similar to pageserver:
|
||||
Safekeeper deploy scripts should register safekeeper at storage_contorller as
|
||||
they currently do with cplane, under the same id.
|
||||
|
||||
Timeline creation/deletion: already existing POST `tenant/:tenant_id/timeline`
|
||||
would 1) choose initial set of safekeepers; 2) write to the db initial
|
||||
`Configuration` with `INSERT ON CONFLICT DO NOTHING` returning existing row in
|
||||
case of conflict; 3) create timeline on the majority of safekeepers (already
|
||||
created is ok).
|
||||
Timeline creation/deletion will work through already existing POST and DELETE
|
||||
`tenant/:tenant_id/timeline`. Cplane is expected to retry both until they
|
||||
succeed. See next section on the implementation details.
|
||||
|
||||
We don't want to block timeline creation when one safekeeper is down. Currently
|
||||
this is solved by compute implicitly creating timeline on any safekeeper it is
|
||||
connected to. This creates ugly timeline state on safekeeper when timeline is
|
||||
created, but start LSN is not defined yet. It would be nice to remove this; to
|
||||
do that, controller can in the background retry to create timeline on
|
||||
safekeeper(s) which missed that during initial creation call. It can do that
|
||||
through `pull_timeline` from majority so it doesn't need to remember
|
||||
`parent_lsn` in its db.
|
||||
|
||||
Timeline deletion removes the row from the db and forwards deletion to the
|
||||
current configuration members. Without additional actions deletions might leak,
|
||||
see below on this; initially let's ignore these, reporting to cplane success if
|
||||
at least one safekeeper deleted the timeline (this will remove s3 data).
|
||||
We don't want to block timeline creation/deletion when one safekeeper is down.
|
||||
Currently this is crutched by compute implicitly creating timeline on any
|
||||
safekeeper it is connected to. This creates ugly timeline state on safekeeper
|
||||
when timeline is created, but start LSN is not defined yet. Next section
|
||||
describes dealing with this.
|
||||
|
||||
Tenant deletion repeats timeline deletion for all timelines.
|
||||
|
||||
@@ -395,26 +391,6 @@ Similar call should be added for the tenant.
|
||||
It would be great to have some way of subscribing to the results (apart from
|
||||
looking at logs/metrics).
|
||||
|
||||
Migration is executed as described above. One subtlety is that (local) deletion on
|
||||
source safekeeper might fail, which is not a problem if we are going to
|
||||
decomission the node but leaves garbage otherwise. I'd propose in the first version
|
||||
1) Don't attempt deletion at all if node status is `offline`.
|
||||
2) If it failed, just issue warning.
|
||||
And add PUT `/control/v1/safekeepers/:node_id/scrub` endpoint which would find and
|
||||
remove garbage timelines for manual use. It will 1) list all timelines on the
|
||||
safekeeper 2) compare each one against configuration storage: if timeline
|
||||
doesn't exist at all (had been deleted), it can be deleted. Otherwise, it can
|
||||
be deleted under generation number if node is not member of current generation.
|
||||
|
||||
Automating this is untrivial; we'd need to register all potential missing
|
||||
deletions <tenant_id, timeline_id, generation, node_id> in the same transaction
|
||||
which switches configurations. Similarly when timeline is fully deleted to
|
||||
prevent cplane operation from blocking when some safekeeper is not available
|
||||
deletion should be also registered.
|
||||
|
||||
One more task pool should infinitely retry notifying control plane about changed
|
||||
safekeeper sets.
|
||||
|
||||
3) GET `/control/v1/tenant/:tenant_id/timeline/:timeline_id/` should return
|
||||
current in memory state of the timeline and pending `MigrationRequest`,
|
||||
if any.
|
||||
@@ -423,12 +399,153 @@ safekeeper sets.
|
||||
migration by switching configuration from the joint to the one with (previous) `sk_set` under CAS
|
||||
(incrementing generation as always).
|
||||
|
||||
#### API implementation and reconciliation
|
||||
|
||||
For timeline creation/deletion we want to preserve the basic assumption that
|
||||
unreachable minority (1 sk of 3) doesn't block their completion, but eventually
|
||||
we want to finish creation/deletion on nodes which missed it (unless they are
|
||||
removed). Similarly for migration; it may and should finish even though excluded
|
||||
members missed their exclusion. And of course e.g. such pending exclusion on
|
||||
node C after migration ABC -> ABD must not prevent next migration ABD -> ABE. As
|
||||
another example, if some node missed timeline creation it clearly must not block
|
||||
migration from it. Hence it is natural to have per safekeeper background
|
||||
reconciler which retries these ops until they succeed. There are 3 possible
|
||||
operation types, and the type is defined by timeline state (membership
|
||||
configuration and whether it is deleted) and safekeeper id: we may need to
|
||||
create timeline on sk (node added), locally delete it (node excluded, somewhat
|
||||
similar to detach) or globally delete it (timeline is deleted).
|
||||
|
||||
Next, on storage controller restart in principle these pending operations can be
|
||||
figured out by comparing safekeepers state against storcon state. But it seems
|
||||
better to me to materialize them in the database; it is not expensive, avoids
|
||||
these startup scans which themselves can fail etc and makes it very easy to see
|
||||
outstanding work directly at the source of truth -- the db. So we can add table
|
||||
`safekeeper_timeline_pending_ops`
|
||||
```
|
||||
table! {
|
||||
// timeline_id, sk_id is primary key
|
||||
safekeeper_timeline_pending_ops (sk_id, tenant_id, timeline_id) {
|
||||
sk_id -> int8,
|
||||
tenant_id -> Varchar,
|
||||
timeline_id -> Varchar,
|
||||
generation -> Int4,
|
||||
op_type -> Varchar,
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
`op_type` can be `include` (seed from peers and ensure generation is up to
|
||||
date), `exclude` (remove locally) and `delete`. Field is actually not strictly
|
||||
needed as it can be computed from current configuration, but gives more explicit
|
||||
observability.
|
||||
|
||||
`generation` is necessary there because after op is done reconciler must remove
|
||||
it and not remove another row with higher gen which in theory might appear.
|
||||
|
||||
Any insert of row should overwrite (remove) all rows with the same sk and
|
||||
timeline id but lower `generation` as next op makes previous obsolete. Insertion
|
||||
of `op_type` `delete` overwrites all rows.
|
||||
|
||||
About `exclude`: rather than adding explicit safekeeper http endpoint, it is
|
||||
reasonable to reuse membership switch endpoint: if safekeeper is not member
|
||||
of the configuration it locally removes the timeline on the switch. In this case
|
||||
404 should also be considered an 'ok' answer by the caller.
|
||||
|
||||
So, main loop of per sk reconcile reads `safekeeper_timeline_pending_ops`
|
||||
joined with timeline configuration to get current conf (with generation `n`)
|
||||
for the safekeeper and does the jobs, infinitely retrying failures:
|
||||
1) If node is member (`include`):
|
||||
- Check if timeline exists on it, if not, call pull_timeline on it from
|
||||
other members
|
||||
- Call switch configuration to the current
|
||||
2) If node is not member (`exclude`):
|
||||
- Call switch configuration to the current, 404 is ok.
|
||||
3) If timeline is deleted (`delete`), call delete.
|
||||
|
||||
In cases 1 and 2 remove `safekeeper_timeline_pending_ops` for the sk and
|
||||
timeline with generation <= `n` if `op_type` is not `delete`.
|
||||
In case 3 also remove `safekeeper_timeline_pending_ops`
|
||||
entry + remove `timelines` entry if there is nothing left in `safekeeper_timeline_pending_ops` for the timeline.
|
||||
|
||||
Let's consider in details how APIs can be implemented from this angle.
|
||||
|
||||
Timeline creation. It is assumed that cplane retries it until success, so all
|
||||
actions must be idempotent. Now, a tricky point here is timeline start LSN. For
|
||||
the initial (tenant creation) call cplane doesn't know it. However, setting
|
||||
start_lsn on safekeepers during creation is a good thing -- it provides a
|
||||
guarantee that walproposer can always find a common point in WAL histories of
|
||||
safekeeper and its own, and so absense of it would be a clear sign of
|
||||
corruption. The following sequence works:
|
||||
1) Create timeline (or observe that it exists) on pageserver,
|
||||
figuring out last_record_lsn in response.
|
||||
2) Choose safekeepers and insert (ON CONFLICT DO NOTHING) timeline row into the
|
||||
db. Note that last_record_lsn returned on the previous step is movable as it
|
||||
changes once ingestion starts, insert must not overwrite it (as well as other
|
||||
fields like membership conf). On the contrary, start_lsn used in the next
|
||||
step must be set to the value in the db. cplane_notified_generation can be set
|
||||
to 1 (initial generation) in insert to avoid notifying cplane about initial
|
||||
conf as cplane will receive it in timeline creation request anyway.
|
||||
3) Issue timeline creation calls to at least majority of safekeepers. Using
|
||||
majority here is not necessary but handy because it guarantees that any live
|
||||
majority will have at least one sk with created timeline and so
|
||||
reconciliation task can use pull_timeline shared with migration instead of
|
||||
create timeline special init case. OFC if timeline is already exists call is
|
||||
ignored.
|
||||
4) For minority of safekeepers which could have missed creation insert
|
||||
entries to `safekeeper_timeline_pending_ops`. We won't miss this insertion
|
||||
because response to cplane is sent only after it has happened, and cplane
|
||||
retries the call until 200 response.
|
||||
|
||||
There is a small question how request handler (timeline creation in this
|
||||
case) would interact with per sk reconciler. As always I prefer to do the
|
||||
simplest possible thing and here it seems to be just waking it up so it
|
||||
re-reads the db for work to do. Passing work in memory is faster, but
|
||||
that shouldn't matter, and path to scan db for work will exist anyway,
|
||||
simpler to reuse it.
|
||||
|
||||
For pg version / wal segment size: while we may persist them in `timelines`
|
||||
table, it is not necessary as initial creation at step 3 can take them from
|
||||
pageserver or cplane creation call and later pull_timeline will carry them
|
||||
around.
|
||||
|
||||
Timeline migration.
|
||||
1) CAS to the db to create joint conf, and in the same transaction create
|
||||
`safekeeper_timeline_pending_ops` `include` entries to initialize new members
|
||||
as well as deliver this conf to current ones; poke per sk reconcilers to work
|
||||
on it. Also any conf change should also poke cplane notifier task(s).
|
||||
2) Once it becomes possible per alg description above, get out of joint conf
|
||||
with another CAS. Task should get wakeups from per sk reconcilers because
|
||||
conf switch is required for advancement; however retries should be sleep
|
||||
based as well as LSN advancement might be needed, though in happy path
|
||||
it isn't. To see whether further transition is possible on wakup migration
|
||||
executor polls safekeepers per the algorithm. CAS creating new conf with only
|
||||
new members should again insert entries to `safekeeper_timeline_pending_ops`
|
||||
to switch them there, as well as `exclude` rows to remove timeline from
|
||||
old members.
|
||||
|
||||
Timeline deletion: just set `deleted_at` on the timeline row and insert
|
||||
`safekeeper_timeline_pending_ops` entries in the same xact, the rest is done by
|
||||
per sk reconcilers.
|
||||
|
||||
When node is removed (set to `decomissioned`), `safekeeper_timeline_pending_ops`
|
||||
for it must be cleared in the same transaction.
|
||||
|
||||
One more task pool should infinitely retry notifying control plane about changed
|
||||
safekeeper sets (trying making `cplane_notified_generation` equal `generation`).
|
||||
|
||||
#### Dealing with multiple instances of storage_controller
|
||||
|
||||
Operations described above executed concurrently might create some errors but do
|
||||
not prevent progress, so while we normally don't want to run multiple instances
|
||||
of storage_controller it is fine to have it temporarily, e.g. during redeploy.
|
||||
|
||||
To harden against some controller instance creating some work in
|
||||
`safekeeper_timeline_pending_ops` and then disappearing without anyone pickup up
|
||||
the job per sk reconcilers apart from explicit wakups should scan for work
|
||||
periodically. It is possible to remove that though if all db updates are
|
||||
protected with leadership token/term -- then such scans are needed only after
|
||||
leadership is acquired.
|
||||
|
||||
Any interactions with db update in-memory controller state, e.g. if migration
|
||||
request failed because different one is in progress, controller remembers that
|
||||
and tries to finish it.
|
||||
@@ -545,7 +662,7 @@ Aurora does this but similarly I don't think this is needed.
|
||||
|
||||
We should use Compute <-> safekeeper protocol change to include other (long
|
||||
yearned) modifications:
|
||||
- send data in network order to make arm work.
|
||||
- send data in network order without putting whole structs to be arch independent
|
||||
- remove term_start_lsn from AppendRequest
|
||||
- add horizon to TermHistory
|
||||
- add to ProposerGreeting number of connection from this wp to sk
|
||||
|
||||
@@ -94,6 +94,7 @@ pub struct ConfigToml {
|
||||
pub ondemand_download_behavior_treat_error_as_warn: bool,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub background_task_maximum_delay: Duration,
|
||||
pub use_compaction_semaphore: bool,
|
||||
pub control_plane_api: Option<reqwest::Url>,
|
||||
pub control_plane_api_token: Option<String>,
|
||||
pub control_plane_emergency_mode: bool,
|
||||
@@ -121,6 +122,7 @@ pub struct ConfigToml {
|
||||
pub wal_receiver_protocol: PostgresClientProtocol,
|
||||
pub page_service_pipelining: PageServicePipeliningConfig,
|
||||
pub get_vectored_concurrent_io: GetVectoredConcurrentIo,
|
||||
pub enable_read_path_debugging: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -470,6 +472,7 @@ impl Default for ConfigToml {
|
||||
DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY,
|
||||
)
|
||||
.unwrap()),
|
||||
use_compaction_semaphore: false,
|
||||
|
||||
control_plane_api: (None),
|
||||
control_plane_api_token: (None),
|
||||
@@ -510,6 +513,11 @@ impl Default for ConfigToml {
|
||||
} else {
|
||||
GetVectoredConcurrentIo::SidecarTask
|
||||
},
|
||||
enable_read_path_debugging: if cfg!(test) || cfg!(feature = "testing") {
|
||||
Some(true)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::fmt::{Debug, Display};
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::Future;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -29,6 +30,11 @@ pub async fn exponential_backoff(
|
||||
}
|
||||
}
|
||||
|
||||
pub fn exponential_backoff_duration(n: u32, base_increment: f64, max_seconds: f64) -> Duration {
|
||||
let seconds = exponential_backoff_duration_seconds(n, base_increment, max_seconds);
|
||||
Duration::from_secs_f64(seconds)
|
||||
}
|
||||
|
||||
pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_seconds: f64) -> f64 {
|
||||
if n == 0 {
|
||||
0.0
|
||||
|
||||
@@ -140,6 +140,10 @@ pub struct PageServerConf {
|
||||
/// not terrible.
|
||||
pub background_task_maximum_delay: Duration,
|
||||
|
||||
/// If true, use a separate semaphore for compaction tasks instead of the common background task
|
||||
/// semaphore. Defaults to false.
|
||||
pub use_compaction_semaphore: bool,
|
||||
|
||||
pub control_plane_api: Option<Url>,
|
||||
|
||||
/// JWT token for use with the control plane API.
|
||||
@@ -193,6 +197,10 @@ pub struct PageServerConf {
|
||||
pub page_service_pipelining: pageserver_api::config::PageServicePipeliningConfig,
|
||||
|
||||
pub get_vectored_concurrent_io: pageserver_api::config::GetVectoredConcurrentIo,
|
||||
|
||||
/// Enable read path debugging. If enabled, read key errors will print a backtrace of the layer
|
||||
/// files read.
|
||||
pub enable_read_path_debugging: bool,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
@@ -332,6 +340,7 @@ impl PageServerConf {
|
||||
test_remote_failures,
|
||||
ondemand_download_behavior_treat_error_as_warn,
|
||||
background_task_maximum_delay,
|
||||
use_compaction_semaphore,
|
||||
control_plane_api,
|
||||
control_plane_api_token,
|
||||
control_plane_emergency_mode,
|
||||
@@ -355,6 +364,7 @@ impl PageServerConf {
|
||||
wal_receiver_protocol,
|
||||
page_service_pipelining,
|
||||
get_vectored_concurrent_io,
|
||||
enable_read_path_debugging,
|
||||
} = config_toml;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
@@ -385,6 +395,7 @@ impl PageServerConf {
|
||||
test_remote_failures,
|
||||
ondemand_download_behavior_treat_error_as_warn,
|
||||
background_task_maximum_delay,
|
||||
use_compaction_semaphore,
|
||||
control_plane_api,
|
||||
control_plane_emergency_mode,
|
||||
heatmap_upload_concurrency,
|
||||
@@ -440,6 +451,7 @@ impl PageServerConf {
|
||||
.unwrap_or_default(),
|
||||
virtual_file_io_mode: virtual_file_io_mode.unwrap_or(virtual_file::IoMode::preferred()),
|
||||
no_sync: no_sync.unwrap_or(false),
|
||||
enable_read_path_debugging: enable_read_path_debugging.unwrap_or(false),
|
||||
};
|
||||
|
||||
// ------------------------------------------------------------
|
||||
|
||||
@@ -8,7 +8,6 @@ use std::time::Duration;
|
||||
|
||||
use crate::controller_upcall_client::ControlPlaneGenerationsApi;
|
||||
use crate::metrics;
|
||||
use crate::tenant::remote_timeline_client::remote_layer_path;
|
||||
use crate::tenant::remote_timeline_client::remote_timeline_path;
|
||||
use crate::tenant::remote_timeline_client::LayerFileMetadata;
|
||||
use crate::virtual_file::MaybeFatalIo;
|
||||
@@ -463,45 +462,18 @@ impl DeletionQueueClient {
|
||||
///
|
||||
/// The `current_generation` is the generation of this pageserver's current attachment. The
|
||||
/// generations in `layers` are the generations in which those layers were written.
|
||||
pub(crate) async fn push_layers(
|
||||
pub(crate) fn push_layers(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
current_generation: Generation,
|
||||
layers: Vec<(LayerName, LayerFileMetadata)>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
if current_generation.is_none() {
|
||||
debug!("Enqueuing deletions in legacy mode, skipping queue");
|
||||
// None generations are not valid for attached tenants: they must always be attached in
|
||||
// a known generation. None generations are still permitted for layers in the index because
|
||||
// they may be historical.
|
||||
assert!(!current_generation.is_none());
|
||||
|
||||
let mut layer_paths = Vec::new();
|
||||
for (layer, meta) in layers {
|
||||
layer_paths.push(remote_layer_path(
|
||||
&tenant_shard_id.tenant_id,
|
||||
&timeline_id,
|
||||
meta.shard,
|
||||
&layer,
|
||||
meta.generation,
|
||||
));
|
||||
}
|
||||
self.push_immediate(layer_paths).await?;
|
||||
return self.flush_immediate().await;
|
||||
}
|
||||
|
||||
self.push_layers_sync(tenant_shard_id, timeline_id, current_generation, layers)
|
||||
}
|
||||
|
||||
/// When a Tenant has a generation, push_layers is always synchronous because
|
||||
/// the ListValidator channel is an unbounded channel.
|
||||
///
|
||||
/// This can be merged into push_layers when we remove the Generation-less mode
|
||||
/// support (`<https://github.com/neondatabase/neon/issues/5395>`)
|
||||
pub(crate) fn push_layers_sync(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
current_generation: Generation,
|
||||
layers: Vec<(LayerName, LayerFileMetadata)>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
metrics::DELETION_QUEUE
|
||||
.keys_submitted
|
||||
.inc_by(layers.len() as u64);
|
||||
@@ -957,14 +929,12 @@ mod test {
|
||||
|
||||
// File should still be there after we push it to the queue (we haven't pushed enough to flush anything)
|
||||
info!("Pushing");
|
||||
client
|
||||
.push_layers(
|
||||
tenant_shard_id,
|
||||
TIMELINE_ID,
|
||||
now_generation,
|
||||
[(layer_file_name_1.clone(), layer_metadata)].to_vec(),
|
||||
)
|
||||
.await?;
|
||||
client.push_layers(
|
||||
tenant_shard_id,
|
||||
TIMELINE_ID,
|
||||
now_generation,
|
||||
[(layer_file_name_1.clone(), layer_metadata)].to_vec(),
|
||||
)?;
|
||||
assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
|
||||
|
||||
assert_local_files(&[], &deletion_prefix);
|
||||
@@ -1017,14 +987,12 @@ mod test {
|
||||
assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
|
||||
|
||||
tracing::debug!("Pushing...");
|
||||
client
|
||||
.push_layers(
|
||||
tenant_shard_id,
|
||||
TIMELINE_ID,
|
||||
stale_generation,
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
|
||||
)
|
||||
.await?;
|
||||
client.push_layers(
|
||||
tenant_shard_id,
|
||||
TIMELINE_ID,
|
||||
stale_generation,
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
|
||||
)?;
|
||||
|
||||
// We enqueued the operation in a stale generation: it should have failed validation
|
||||
tracing::debug!("Flushing...");
|
||||
@@ -1032,14 +1000,12 @@ mod test {
|
||||
assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
|
||||
|
||||
tracing::debug!("Pushing...");
|
||||
client
|
||||
.push_layers(
|
||||
tenant_shard_id,
|
||||
TIMELINE_ID,
|
||||
latest_generation,
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
|
||||
)
|
||||
.await?;
|
||||
client.push_layers(
|
||||
tenant_shard_id,
|
||||
TIMELINE_ID,
|
||||
latest_generation,
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
|
||||
)?;
|
||||
|
||||
// We enqueued the operation in a fresh generation: it should have passed validation
|
||||
tracing::debug!("Flushing...");
|
||||
@@ -1074,28 +1040,24 @@ mod test {
|
||||
// generation gets that treatment)
|
||||
let remote_layer_file_name_historical =
|
||||
ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
|
||||
client
|
||||
.push_layers(
|
||||
tenant_shard_id,
|
||||
TIMELINE_ID,
|
||||
now_generation.previous(),
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
|
||||
)
|
||||
.await?;
|
||||
client.push_layers(
|
||||
tenant_shard_id,
|
||||
TIMELINE_ID,
|
||||
now_generation.previous(),
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
|
||||
)?;
|
||||
|
||||
// Inject a deletion in the generation before generation_now: after restart,
|
||||
// this deletion should get executed, because we execute deletions in the
|
||||
// immediately previous generation on the same node.
|
||||
let remote_layer_file_name_previous =
|
||||
ctx.write_remote_layer(EXAMPLE_LAYER_NAME_ALT, layer_generation)?;
|
||||
client
|
||||
.push_layers(
|
||||
tenant_shard_id,
|
||||
TIMELINE_ID,
|
||||
now_generation,
|
||||
[(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
|
||||
)
|
||||
.await?;
|
||||
client.push_layers(
|
||||
tenant_shard_id,
|
||||
TIMELINE_ID,
|
||||
now_generation,
|
||||
[(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
|
||||
)?;
|
||||
|
||||
client.flush().await?;
|
||||
assert_remote_files(
|
||||
@@ -1139,6 +1101,7 @@ pub(crate) mod mock {
|
||||
use tracing::info;
|
||||
|
||||
use super::*;
|
||||
use crate::tenant::remote_timeline_client::remote_layer_path;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
pub struct ConsumerState {
|
||||
|
||||
@@ -61,6 +61,7 @@ use crate::{
|
||||
remote_timeline_client::LayerFileMetadata,
|
||||
secondary::SecondaryTenant,
|
||||
storage_layer::{AsLayerDesc, EvictionError, Layer, LayerName, LayerVisibilityHint},
|
||||
tasks::sleep_random,
|
||||
},
|
||||
CancellableTask, DiskUsageEvictionTask,
|
||||
};
|
||||
@@ -210,14 +211,8 @@ async fn disk_usage_eviction_task(
|
||||
info!("disk usage based eviction task finishing");
|
||||
};
|
||||
|
||||
use crate::tenant::tasks::random_init_delay;
|
||||
{
|
||||
if random_init_delay(task_config.period, &cancel)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return;
|
||||
}
|
||||
if sleep_random(task_config.period, &cancel).await.is_err() {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut iteration_no = 0;
|
||||
|
||||
@@ -489,7 +489,6 @@ impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrappe
|
||||
let timeline = tenant_shard
|
||||
.get_timeline(timeline_id, true)
|
||||
.map_err(GetActiveTimelineError::Timeline)?;
|
||||
set_tracing_field_shard_id(&timeline);
|
||||
Ok(timeline)
|
||||
}
|
||||
}
|
||||
@@ -774,11 +773,11 @@ impl PageServerHandler {
|
||||
|
||||
let batched_msg = match neon_fe_msg {
|
||||
PagestreamFeMessage::Exists(req) => {
|
||||
let span = tracing::info_span!(parent: parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn);
|
||||
let shard = timeline_handles
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.instrument(span.clone()) // sets `shard_id` field
|
||||
.await?;
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
|
||||
let span = tracing::info_span!(parent: &parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
|
||||
let timer = record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetRelExists,
|
||||
@@ -793,11 +792,10 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
PagestreamFeMessage::Nblocks(req) => {
|
||||
let span = tracing::info_span!(parent: parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn);
|
||||
let shard = timeline_handles
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.instrument(span.clone()) // sets `shard_id` field
|
||||
.await?;
|
||||
let span = tracing::info_span!(parent: &parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
|
||||
let timer = record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetRelSize,
|
||||
@@ -812,11 +810,10 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
PagestreamFeMessage::DbSize(req) => {
|
||||
let span = tracing::info_span!(parent: parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn);
|
||||
let shard = timeline_handles
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.instrument(span.clone()) // sets `shard_id` field
|
||||
.await?;
|
||||
let span = tracing::info_span!(parent: &parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
|
||||
let timer = record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetDbSize,
|
||||
@@ -831,11 +828,10 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
PagestreamFeMessage::GetSlruSegment(req) => {
|
||||
let span = tracing::info_span!(parent: parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn);
|
||||
let shard = timeline_handles
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.instrument(span.clone()) // sets `shard_id` field
|
||||
.await?;
|
||||
let span = tracing::info_span!(parent: &parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
|
||||
let timer = record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetSlruSegment,
|
||||
@@ -850,12 +846,20 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
PagestreamFeMessage::GetPage(req) => {
|
||||
let span = tracing::info_span!(parent: parent_span, "handle_get_page_at_lsn_request_batched", req_lsn = %req.hdr.request_lsn);
|
||||
// avoid a somewhat costly Span::record() by constructing the entire span in one go.
|
||||
macro_rules! mkspan {
|
||||
(before shard routing) => {{
|
||||
tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn)
|
||||
}};
|
||||
($shard_id:expr) => {{
|
||||
tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn, shard_id = %$shard_id)
|
||||
}};
|
||||
}
|
||||
|
||||
macro_rules! respond_error {
|
||||
($error:expr) => {{
|
||||
($span:expr, $error:expr) => {{
|
||||
let error = BatchedFeMessage::RespondError {
|
||||
span,
|
||||
span: $span,
|
||||
error: BatchedPageStreamError {
|
||||
req: req.hdr,
|
||||
err: $error,
|
||||
@@ -868,27 +872,35 @@ impl PageServerHandler {
|
||||
let key = rel_block_to_key(req.rel, req.blkno);
|
||||
let shard = match timeline_handles
|
||||
.get(tenant_id, timeline_id, ShardSelector::Page(key))
|
||||
.instrument(span.clone()) // sets `shard_id` field
|
||||
.await
|
||||
{
|
||||
Ok(tl) => tl,
|
||||
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
|
||||
// We already know this tenant exists in general, because we resolved it at
|
||||
// start of connection. Getting a NotFound here indicates that the shard containing
|
||||
// the requested page is not present on this node: the client's knowledge of shard->pageserver
|
||||
// mapping is out of date.
|
||||
//
|
||||
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
|
||||
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
|
||||
// and talk to a different pageserver.
|
||||
return respond_error!(PageStreamError::Reconnect(
|
||||
"getpage@lsn request routed to wrong shard".into()
|
||||
));
|
||||
}
|
||||
Err(e) => {
|
||||
return respond_error!(e.into());
|
||||
let span = mkspan!(before shard routing);
|
||||
match e {
|
||||
GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_)) => {
|
||||
// We already know this tenant exists in general, because we resolved it at
|
||||
// start of connection. Getting a NotFound here indicates that the shard containing
|
||||
// the requested page is not present on this node: the client's knowledge of shard->pageserver
|
||||
// mapping is out of date.
|
||||
//
|
||||
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
|
||||
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
|
||||
// and talk to a different pageserver.
|
||||
return respond_error!(
|
||||
span,
|
||||
PageStreamError::Reconnect(
|
||||
"getpage@lsn request routed to wrong shard".into()
|
||||
)
|
||||
);
|
||||
}
|
||||
e => {
|
||||
return respond_error!(span, e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
let span = mkspan!(shard.tenant_shard_id.shard_slug());
|
||||
|
||||
let timer = record_op_start_and_throttle(
|
||||
&shard,
|
||||
@@ -910,7 +922,7 @@ impl PageServerHandler {
|
||||
{
|
||||
Ok(lsn) => lsn,
|
||||
Err(e) => {
|
||||
return respond_error!(e);
|
||||
return respond_error!(span, e);
|
||||
}
|
||||
};
|
||||
BatchedFeMessage::GetPage {
|
||||
@@ -922,11 +934,10 @@ impl PageServerHandler {
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
PagestreamFeMessage::Test(req) => {
|
||||
let span = tracing::info_span!(parent: parent_span, "handle_test_request");
|
||||
let shard = timeline_handles
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.instrument(span.clone()) // sets `shard_id` field
|
||||
.await?;
|
||||
let span = tracing::info_span!(parent: &parent_span, "handle_test_request", shard_id = %shard.tenant_shard_id.shard_slug());
|
||||
let timer =
|
||||
record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at)
|
||||
.await?;
|
||||
@@ -1190,6 +1201,29 @@ impl PageServerHandler {
|
||||
}
|
||||
};
|
||||
|
||||
// We purposefully don't count flush time into the smgr operaiton timer.
|
||||
//
|
||||
// The reason is that current compute client will not perform protocol processing
|
||||
// if the postgres backend process is doing things other than `->smgr_read()`.
|
||||
// This is especially the case for prefetch.
|
||||
//
|
||||
// If the compute doesn't read from the connection, eventually TCP will backpressure
|
||||
// all the way into our flush call below.
|
||||
//
|
||||
// The timer's underlying metric is used for a storage-internal latency SLO and
|
||||
// we don't want to include latency in it that we can't control.
|
||||
// And as pointed out above, in this case, we don't control the time that flush will take.
|
||||
//
|
||||
// We put each response in the batch onto the wire in a separate pgb_writer.flush()
|
||||
// call, which (all unmeasured) adds syscall overhead but reduces time to first byte
|
||||
// and avoids building up a "giant" contiguous userspace buffer to hold the entire response.
|
||||
// TODO: vectored socket IO would be great, but pgb_writer doesn't support that.
|
||||
//
|
||||
// Since we're flushing multiple times in the loop, but only have access to the per-op
|
||||
// timers inside the loop, we capture the flush start time here and reuse it to finish
|
||||
// each op timer.
|
||||
let flushing_start_time = Instant::now();
|
||||
|
||||
// Map handler result to protocol behavior.
|
||||
// Some handler errors cause exit from pagestream protocol.
|
||||
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
|
||||
@@ -1238,21 +1272,9 @@ impl PageServerHandler {
|
||||
&response_msg.serialize(protocol_version),
|
||||
))?;
|
||||
|
||||
// We purposefully don't count flush time into the timer.
|
||||
//
|
||||
// The reason is that current compute client will not perform protocol processing
|
||||
// if the postgres backend process is doing things other than `->smgr_read()`.
|
||||
// This is especially the case for prefetch.
|
||||
//
|
||||
// If the compute doesn't read from the connection, eventually TCP will backpressure
|
||||
// all the way into our flush call below.
|
||||
//
|
||||
// The timer's underlying metric is used for a storage-internal latency SLO and
|
||||
// we don't want to include latency in it that we can't control.
|
||||
// And as pointed out above, in this case, we don't control the time that flush will take.
|
||||
let flushing_timer = timer.map(|mut timer| {
|
||||
timer
|
||||
.observe_execution_end_flush_start(Instant::now())
|
||||
.observe_execution_end_flush_start(flushing_start_time)
|
||||
.expect("we are the first caller")
|
||||
});
|
||||
|
||||
@@ -1340,7 +1362,7 @@ impl PageServerHandler {
|
||||
.take()
|
||||
.expect("implementation error: timeline_handles should not be locked");
|
||||
|
||||
let request_span = info_span!("request", shard_id = tracing::field::Empty);
|
||||
let request_span = info_span!("request");
|
||||
let ((pgb_reader, timeline_handles), result) = match self.pipelining_config.clone() {
|
||||
PageServicePipeliningConfig::Pipelined(pipelining_config) => {
|
||||
self.handle_pagerequests_pipelined(
|
||||
@@ -2034,6 +2056,7 @@ impl PageServerHandler {
|
||||
.unwrap()
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.await?;
|
||||
set_tracing_field_shard_id(&timeline);
|
||||
|
||||
if timeline.is_archived() == Some(true) {
|
||||
// TODO after a grace period, turn this log line into a hard error
|
||||
|
||||
@@ -328,8 +328,8 @@ pub enum TaskKind {
|
||||
// Eviction. One per timeline.
|
||||
Eviction,
|
||||
|
||||
// Ingest housekeeping (flushing ephemeral layers on time threshold or disk pressure)
|
||||
IngestHousekeeping,
|
||||
// Tenant housekeeping (flush idle ephemeral layers, shut down idle walredo, etc.).
|
||||
TenantHousekeeping,
|
||||
|
||||
/// See [`crate::disk_usage_eviction_task`].
|
||||
DiskUsageEviction,
|
||||
|
||||
@@ -20,6 +20,7 @@ use chrono::NaiveDateTime;
|
||||
use enumset::EnumSet;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use itertools::Itertools as _;
|
||||
use pageserver_api::models;
|
||||
use pageserver_api::models::CompactInfoResponse;
|
||||
use pageserver_api::models::LsnLease;
|
||||
@@ -55,6 +56,7 @@ use timeline::CompactOptions;
|
||||
use timeline::ShutdownMode;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::sync::watch;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
@@ -349,6 +351,9 @@ pub struct Tenant {
|
||||
/// Overhead of mutex is acceptable because compaction is done with a multi-second period.
|
||||
compaction_circuit_breaker: std::sync::Mutex<CircuitBreaker>,
|
||||
|
||||
/// Signals the tenant compaction loop that there is L0 compaction work to be done.
|
||||
pub(crate) l0_compaction_trigger: Arc<Notify>,
|
||||
|
||||
/// Scheduled gc-compaction tasks.
|
||||
scheduled_compaction_tasks: std::sync::Mutex<HashMap<TimelineId, Arc<GcCompactionQueue>>>,
|
||||
|
||||
@@ -1690,12 +1695,7 @@ impl Tenant {
|
||||
timeline_id,
|
||||
index_part,
|
||||
remote_metadata,
|
||||
TimelineResources {
|
||||
remote_client,
|
||||
pagestream_throttle: self.pagestream_throttle.clone(),
|
||||
pagestream_throttle_metrics: self.pagestream_throttle_metrics.clone(),
|
||||
l0_flush_global_state: self.l0_flush_global_state.clone(),
|
||||
},
|
||||
self.get_timeline_resources_for(remote_client),
|
||||
LoadTimelineCause::Attach,
|
||||
ctx,
|
||||
)
|
||||
@@ -3088,32 +3088,28 @@ impl Tenant {
|
||||
Ok(rx)
|
||||
}
|
||||
|
||||
// Call through to all timelines to freeze ephemeral layers if needed. Usually
|
||||
// this happens during ingest: this background housekeeping is for freezing layers
|
||||
// that are open but haven't been written to for some time.
|
||||
async fn ingest_housekeeping(&self) {
|
||||
// Scan through the hashmap and collect a list of all the timelines,
|
||||
// while holding the lock. Then drop the lock and actually perform the
|
||||
// compactions. We don't want to block everything else while the
|
||||
// compaction runs.
|
||||
let timelines = {
|
||||
self.timelines
|
||||
.lock()
|
||||
.unwrap()
|
||||
.values()
|
||||
.filter_map(|timeline| {
|
||||
if timeline.is_active() {
|
||||
Some(timeline.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
/// Performs periodic housekeeping, via the tenant housekeeping background task.
|
||||
async fn housekeeping(&self) {
|
||||
// Call through to all timelines to freeze ephemeral layers as needed. This usually happens
|
||||
// during ingest, but we don't want idle timelines to hold open layers for too long.
|
||||
let timelines = self
|
||||
.timelines
|
||||
.lock()
|
||||
.unwrap()
|
||||
.values()
|
||||
.filter(|tli| tli.is_active())
|
||||
.cloned()
|
||||
.collect_vec();
|
||||
|
||||
for timeline in &timelines {
|
||||
for timeline in timelines {
|
||||
timeline.maybe_freeze_ephemeral_layer().await;
|
||||
}
|
||||
|
||||
// Shut down walredo if idle.
|
||||
const WALREDO_IDLE_TIMEOUT: Duration = Duration::from_secs(180);
|
||||
if let Some(ref walredo_mgr) = self.walredo_mgr {
|
||||
walredo_mgr.maybe_quiesce(WALREDO_IDLE_TIMEOUT);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn timeline_has_no_attached_children(&self, timeline_id: TimelineId) -> bool {
|
||||
@@ -4115,6 +4111,7 @@ impl Tenant {
|
||||
// use an extremely long backoff.
|
||||
Some(Duration::from_secs(3600 * 24)),
|
||||
)),
|
||||
l0_compaction_trigger: Arc::new(Notify::new()),
|
||||
scheduled_compaction_tasks: Mutex::new(Default::default()),
|
||||
activate_now_sem: tokio::sync::Semaphore::new(0),
|
||||
attach_wal_lag_cooldown: Arc::new(std::sync::OnceLock::new()),
|
||||
@@ -5023,12 +5020,19 @@ impl Tenant {
|
||||
)
|
||||
}
|
||||
|
||||
/// Call this before constructing a timeline, to build its required structures
|
||||
/// Builds required resources for a new timeline.
|
||||
fn build_timeline_resources(&self, timeline_id: TimelineId) -> TimelineResources {
|
||||
let remote_client = self.build_timeline_remote_client(timeline_id);
|
||||
self.get_timeline_resources_for(remote_client)
|
||||
}
|
||||
|
||||
/// Builds timeline resources for the given remote client.
|
||||
fn get_timeline_resources_for(&self, remote_client: RemoteTimelineClient) -> TimelineResources {
|
||||
TimelineResources {
|
||||
remote_client: self.build_timeline_remote_client(timeline_id),
|
||||
remote_client,
|
||||
pagestream_throttle: self.pagestream_throttle.clone(),
|
||||
pagestream_throttle_metrics: self.pagestream_throttle_metrics.clone(),
|
||||
l0_compaction_trigger: self.l0_compaction_trigger.clone(),
|
||||
l0_flush_global_state: self.l0_flush_global_state.clone(),
|
||||
}
|
||||
}
|
||||
@@ -7701,6 +7705,18 @@ mod tests {
|
||||
}
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
// Force layers to L1
|
||||
tline
|
||||
.compact(
|
||||
&cancel,
|
||||
{
|
||||
let mut flags = EnumSet::new();
|
||||
flags.insert(CompactFlags::ForceL0Compaction);
|
||||
flags
|
||||
},
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if iter % 5 == 0 {
|
||||
let (_, before_delta_file_accessed) =
|
||||
@@ -7713,6 +7729,7 @@ mod tests {
|
||||
let mut flags = EnumSet::new();
|
||||
flags.insert(CompactFlags::ForceImageLayerCreation);
|
||||
flags.insert(CompactFlags::ForceRepartition);
|
||||
flags.insert(CompactFlags::ForceL0Compaction);
|
||||
flags
|
||||
},
|
||||
&ctx,
|
||||
@@ -8159,6 +8176,8 @@ mod tests {
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
// Image layer creation happens on the disk_consistent_lsn so we need to force set it now.
|
||||
tline.force_set_disk_consistent_lsn(Lsn(0x40));
|
||||
tline
|
||||
.compact(
|
||||
&cancel,
|
||||
@@ -8172,8 +8191,7 @@ mod tests {
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Image layers are created at last_record_lsn
|
||||
// Image layers are created at repartition LSN
|
||||
let images = tline
|
||||
.inspect_image_layers(Lsn(0x40), &ctx, io_concurrency.clone())
|
||||
.await
|
||||
|
||||
@@ -517,7 +517,7 @@ impl RemoteTimelineClient {
|
||||
if let Ok(queue) = queue_locked.initialized_mut() {
|
||||
let blocked_deletions = std::mem::take(&mut queue.blocked_deletions);
|
||||
for d in blocked_deletions {
|
||||
if let Err(e) = self.deletion_queue_client.push_layers_sync(
|
||||
if let Err(e) = self.deletion_queue_client.push_layers(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
self.generation,
|
||||
@@ -2151,7 +2151,6 @@ impl RemoteTimelineClient {
|
||||
self.generation,
|
||||
delete.layers.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,7 +44,7 @@ pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
|
||||
|
||||
use self::inmemory_layer::InMemoryLayerFileId;
|
||||
|
||||
use super::timeline::GetVectoredError;
|
||||
use super::timeline::{GetVectoredError, ReadPath};
|
||||
use super::PageReconstructError;
|
||||
|
||||
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
|
||||
@@ -262,6 +262,8 @@ pub(crate) struct ValuesReconstructState {
|
||||
|
||||
pub(crate) io_concurrency: IoConcurrency,
|
||||
num_active_ios: Arc<AtomicUsize>,
|
||||
|
||||
pub(crate) read_path: Option<ReadPath>,
|
||||
}
|
||||
|
||||
/// The level of IO concurrency to be used on the read path
|
||||
@@ -609,6 +611,7 @@ impl ValuesReconstructState {
|
||||
delta_layers_visited: 0,
|
||||
io_concurrency,
|
||||
num_active_ios: Arc::new(AtomicUsize::new(0)),
|
||||
read_path: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,48 +1,65 @@
|
||||
//! This module contains functions to serve per-tenant background processes,
|
||||
//! such as compaction and GC
|
||||
//! This module contains per-tenant background processes, e.g. compaction and GC.
|
||||
|
||||
use std::ops::ControlFlow;
|
||||
use std::str::FromStr;
|
||||
use std::cmp::max;
|
||||
use std::future::Future;
|
||||
use std::ops::{ControlFlow, RangeInclusive};
|
||||
use std::pin::pin;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use rand::Rng;
|
||||
use scopeguard::defer;
|
||||
use tokio::sync::{Semaphore, SemaphorePermit};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::metrics::{BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS};
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
|
||||
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS};
|
||||
use crate::tenant::throttle::Stats;
|
||||
use crate::tenant::timeline::compaction::CompactionOutcome;
|
||||
use crate::tenant::timeline::CompactionError;
|
||||
use crate::tenant::{Tenant, TenantState};
|
||||
use once_cell::sync::Lazy;
|
||||
use rand::Rng;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD;
|
||||
use utils::backoff::exponential_backoff_duration;
|
||||
use utils::completion::Barrier;
|
||||
use utils::pausable_failpoint;
|
||||
use utils::rate_limit::RateLimit;
|
||||
use utils::{backoff, completion, pausable_failpoint};
|
||||
|
||||
static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
|
||||
once_cell::sync::Lazy::new(|| {
|
||||
let total_threads = task_mgr::TOKIO_WORKER_THREADS.get();
|
||||
let permits = usize::max(
|
||||
1,
|
||||
// while a lot of the work is done on spawn_blocking, we still do
|
||||
// repartitioning in the async context. this should give leave us some workers
|
||||
// unblocked to be blocked on other work, hopefully easing any outside visible
|
||||
// effects of restarts.
|
||||
//
|
||||
// 6/8 is a guess; previously we ran with unlimited 8 and more from
|
||||
// spawn_blocking.
|
||||
(total_threads * 3).checked_div(4).unwrap_or(0),
|
||||
);
|
||||
assert_ne!(permits, 0, "we will not be adding in permits later");
|
||||
assert!(
|
||||
permits < total_threads,
|
||||
"need threads avail for shorter work"
|
||||
);
|
||||
tokio::sync::Semaphore::new(permits)
|
||||
});
|
||||
/// Semaphore limiting concurrent background tasks (across all tenants).
|
||||
///
|
||||
/// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work.
|
||||
static CONCURRENT_BACKGROUND_TASKS: Lazy<Semaphore> = Lazy::new(|| {
|
||||
let total_threads = TOKIO_WORKER_THREADS.get();
|
||||
let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
|
||||
assert_ne!(permits, 0, "we will not be adding in permits later");
|
||||
assert!(permits < total_threads, "need threads for other work");
|
||||
Semaphore::new(permits)
|
||||
});
|
||||
|
||||
/// Semaphore limiting concurrent compaction tasks (across all tenants). This is disabled by
|
||||
/// default, see `use_compaction_semaphore`.
|
||||
///
|
||||
/// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work.
|
||||
///
|
||||
/// This is a separate semaphore from background tasks, because L0 compaction needs to be responsive
|
||||
/// to avoid high read amp during heavy write workloads.
|
||||
///
|
||||
/// TODO: split image compaction and L0 compaction, and move image compaction to background tasks.
|
||||
/// Only L0 compaction needs to be responsive, and it shouldn't block on image compaction.
|
||||
static CONCURRENT_COMPACTION_TASKS: Lazy<Semaphore> = Lazy::new(|| {
|
||||
let total_threads = TOKIO_WORKER_THREADS.get();
|
||||
let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
|
||||
assert_ne!(permits, 0, "we will not be adding in permits later");
|
||||
assert!(permits < total_threads, "need threads for other work");
|
||||
Semaphore::new(permits)
|
||||
});
|
||||
|
||||
/// Background jobs.
|
||||
///
|
||||
/// NB: not all of these acquire a CONCURRENT_BACKGROUND_TASKS semaphore permit, only the ones that
|
||||
/// do any significant IO.
|
||||
#[derive(
|
||||
Debug,
|
||||
PartialEq,
|
||||
@@ -58,7 +75,7 @@ pub(crate) enum BackgroundLoopKind {
|
||||
Compaction,
|
||||
Gc,
|
||||
Eviction,
|
||||
IngestHouseKeeping,
|
||||
TenantHouseKeeping,
|
||||
ConsumptionMetricsCollectMetrics,
|
||||
ConsumptionMetricsSyntheticSizeWorker,
|
||||
InitialLogicalSizeCalculation,
|
||||
@@ -67,13 +84,14 @@ pub(crate) enum BackgroundLoopKind {
|
||||
}
|
||||
|
||||
pub struct BackgroundLoopSemaphorePermit<'a> {
|
||||
_permit: tokio::sync::SemaphorePermit<'static>,
|
||||
_permit: SemaphorePermit<'static>,
|
||||
_recorder: BackgroundLoopSemaphoreMetricsRecorder<'a>,
|
||||
}
|
||||
|
||||
/// Cancellation safe.
|
||||
pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
|
||||
/// Acquires a semaphore permit, to limit concurrent background jobs.
|
||||
pub(crate) async fn acquire_concurrency_permit(
|
||||
loop_kind: BackgroundLoopKind,
|
||||
use_compaction_semaphore: bool,
|
||||
_ctx: &RequestContext,
|
||||
) -> BackgroundLoopSemaphorePermit<'static> {
|
||||
// TODO: use a lower threshold and remove the pacer once we resolve some blockage.
|
||||
@@ -88,10 +106,13 @@ pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
|
||||
}
|
||||
|
||||
// TODO: assert that we run on BACKGROUND_RUNTIME; requires tokio_unstable Handle::id();
|
||||
let permit = CONCURRENT_BACKGROUND_TASKS
|
||||
.acquire()
|
||||
.await
|
||||
.expect("should never close");
|
||||
let permit = if loop_kind == BackgroundLoopKind::Compaction && use_compaction_semaphore {
|
||||
CONCURRENT_COMPACTION_TASKS.acquire().await
|
||||
} else {
|
||||
assert!(!use_compaction_semaphore);
|
||||
CONCURRENT_BACKGROUND_TASKS.acquire().await
|
||||
}
|
||||
.expect("should never close");
|
||||
|
||||
let waited = recorder.acquired();
|
||||
if waited >= WARN_THRESHOLD {
|
||||
@@ -108,12 +129,10 @@ pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
|
||||
}
|
||||
}
|
||||
|
||||
/// Start per tenant background loops: compaction and gc.
|
||||
pub fn start_background_loops(
|
||||
tenant: &Arc<Tenant>,
|
||||
background_jobs_can_start: Option<&completion::Barrier>,
|
||||
) {
|
||||
/// Start per tenant background loops: compaction, GC, and ingest housekeeping.
|
||||
pub fn start_background_loops(tenant: &Arc<Tenant>, can_start: Option<&Barrier>) {
|
||||
let tenant_shard_id = tenant.tenant_shard_id;
|
||||
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::Compaction,
|
||||
@@ -122,13 +141,15 @@ pub fn start_background_loops(
|
||||
&format!("compactor for tenant {tenant_shard_id}"),
|
||||
{
|
||||
let tenant = Arc::clone(tenant);
|
||||
let background_jobs_can_start = background_jobs_can_start.cloned();
|
||||
let can_start = can_start.cloned();
|
||||
async move {
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
let cancel = task_mgr::shutdown_token(); // NB: must be in async context
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => { return Ok(()) },
|
||||
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
|
||||
_ = cancel.cancelled() => return Ok(()),
|
||||
_ = Barrier::maybe_wait(can_start) => {}
|
||||
};
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
|
||||
compaction_loop(tenant, cancel)
|
||||
// If you rename this span, change the RUST_LOG env variable in test_runner/performance/test_branch_creation.py
|
||||
.instrument(info_span!("compaction_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
|
||||
@@ -137,6 +158,7 @@ pub fn start_background_loops(
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::GarbageCollector,
|
||||
@@ -145,13 +167,15 @@ pub fn start_background_loops(
|
||||
&format!("garbage collector for tenant {tenant_shard_id}"),
|
||||
{
|
||||
let tenant = Arc::clone(tenant);
|
||||
let background_jobs_can_start = background_jobs_can_start.cloned();
|
||||
let can_start = can_start.cloned();
|
||||
async move {
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
let cancel = task_mgr::shutdown_token(); // NB: must be in async context
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => { return Ok(()) },
|
||||
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
|
||||
_ = cancel.cancelled() => return Ok(()),
|
||||
_ = Barrier::maybe_wait(can_start) => {}
|
||||
};
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
|
||||
gc_loop(tenant, cancel)
|
||||
.instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
|
||||
.await;
|
||||
@@ -162,21 +186,23 @@ pub fn start_background_loops(
|
||||
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::IngestHousekeeping,
|
||||
TaskKind::TenantHousekeeping,
|
||||
tenant_shard_id,
|
||||
None,
|
||||
&format!("ingest housekeeping for tenant {tenant_shard_id}"),
|
||||
&format!("housekeeping for tenant {tenant_shard_id}"),
|
||||
{
|
||||
let tenant = Arc::clone(tenant);
|
||||
let background_jobs_can_start = background_jobs_can_start.cloned();
|
||||
let can_start = can_start.cloned();
|
||||
async move {
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
let cancel = task_mgr::shutdown_token(); // NB: must be in async context
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => { return Ok(()) },
|
||||
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
|
||||
_ = cancel.cancelled() => return Ok(()),
|
||||
_ = Barrier::maybe_wait(can_start) => {}
|
||||
};
|
||||
ingest_housekeeping_loop(tenant, cancel)
|
||||
.instrument(info_span!("ingest_housekeeping_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
|
||||
tenant_housekeeping_loop(tenant, cancel)
|
||||
.instrument(info_span!("tenant_housekeeping_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
@@ -184,372 +210,293 @@ pub fn start_background_loops(
|
||||
);
|
||||
}
|
||||
|
||||
///
|
||||
/// Compaction task's main loop
|
||||
///
|
||||
/// Compaction task's main loop.
|
||||
async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
const BASE_BACKOFF_SECS: f64 = 1.0;
|
||||
const MAX_BACKOFF_SECS: f64 = 300.0;
|
||||
// How many errors we have seen consequtively
|
||||
let mut error_run_count = 0;
|
||||
const RECHECK_CONFIG_INTERVAL: Duration = Duration::from_secs(10);
|
||||
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
async {
|
||||
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
|
||||
let mut first = true;
|
||||
loop {
|
||||
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
|
||||
let mut period = tenant.get_compaction_period();
|
||||
let mut error_run = 0; // consecutive errors
|
||||
|
||||
// Stagger the compaction loop across tenants.
|
||||
if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
|
||||
return;
|
||||
}
|
||||
if sleep_random(period, &cancel).await.is_err() {
|
||||
return;
|
||||
}
|
||||
|
||||
loop {
|
||||
// Recheck that we're still active.
|
||||
if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Refresh the period. If compaction is disabled, check again in a bit.
|
||||
period = tenant.get_compaction_period();
|
||||
if period == Duration::ZERO {
|
||||
#[cfg(not(feature = "testing"))]
|
||||
info!("automatic compaction is disabled");
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
return;
|
||||
},
|
||||
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
|
||||
ControlFlow::Break(()) => return,
|
||||
ControlFlow::Continue(()) => (),
|
||||
},
|
||||
_ = tokio::time::sleep(RECHECK_CONFIG_INTERVAL) => {},
|
||||
_ = cancel.cancelled() => return,
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
let period = tenant.get_compaction_period();
|
||||
// Wait for the next compaction run.
|
||||
let backoff = exponential_backoff_duration(error_run, BASE_BACKOFF_SECS, MAX_BACKOFF_SECS);
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(backoff), if error_run > 0 => {},
|
||||
_ = tokio::time::sleep(period), if error_run == 0 => {},
|
||||
_ = tenant.l0_compaction_trigger.notified(), if error_run == 0 => {},
|
||||
_ = cancel.cancelled() => return,
|
||||
}
|
||||
|
||||
// TODO: we shouldn't need to await to find tenant and this could be moved outside of
|
||||
// loop, #3501. There are also additional "allowed_errors" in tests.
|
||||
if first {
|
||||
first = false;
|
||||
if random_init_delay(period, &cancel).await.is_err() {
|
||||
break;
|
||||
// Run compaction.
|
||||
let iteration = Iteration {
|
||||
started_at: Instant::now(),
|
||||
period,
|
||||
kind: BackgroundLoopKind::Compaction,
|
||||
};
|
||||
let IterationResult { output, elapsed } = iteration
|
||||
.run(tenant.compaction_iteration(&cancel, &ctx))
|
||||
.await;
|
||||
|
||||
match output {
|
||||
Ok(outcome) => {
|
||||
error_run = 0;
|
||||
// If there's more compaction work pending, reschedule immediately. This isn't
|
||||
// necessarily L0 compaction, but that's fine for now.
|
||||
//
|
||||
// TODO: differentiate between L0 compaction and other compaction. The former needs
|
||||
// to be responsive, the latter doesn't.
|
||||
if outcome == CompactionOutcome::Pending {
|
||||
tenant.l0_compaction_trigger.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
let sleep_duration;
|
||||
if period == Duration::ZERO {
|
||||
#[cfg(not(feature = "testing"))]
|
||||
info!("automatic compaction is disabled");
|
||||
// check again in 10 seconds, in case it's been enabled again.
|
||||
sleep_duration = Duration::from_secs(10)
|
||||
} else {
|
||||
let iteration = Iteration {
|
||||
started_at: Instant::now(),
|
||||
period,
|
||||
kind: BackgroundLoopKind::Compaction,
|
||||
};
|
||||
|
||||
// Run compaction
|
||||
let IterationResult { output, elapsed } = iteration
|
||||
.run(tenant.compaction_iteration(&cancel, &ctx))
|
||||
.await;
|
||||
match output {
|
||||
Ok(outcome) => {
|
||||
error_run_count = 0;
|
||||
// schedule the next compaction immediately in case there is a pending compaction task
|
||||
sleep_duration = if let CompactionOutcome::Pending = outcome {
|
||||
Duration::from_secs(1)
|
||||
} else {
|
||||
period
|
||||
};
|
||||
}
|
||||
Err(e) => {
|
||||
let wait_duration = backoff::exponential_backoff_duration_seconds(
|
||||
error_run_count + 1,
|
||||
1.0,
|
||||
MAX_BACKOFF_SECS,
|
||||
);
|
||||
error_run_count += 1;
|
||||
let wait_duration = Duration::from_secs_f64(wait_duration);
|
||||
log_compaction_error(
|
||||
&e,
|
||||
error_run_count,
|
||||
&wait_duration,
|
||||
cancel.is_cancelled(),
|
||||
);
|
||||
sleep_duration = wait_duration;
|
||||
}
|
||||
}
|
||||
|
||||
// the duration is recorded by performance tests by enabling debug in this function
|
||||
tracing::debug!(
|
||||
elapsed_ms = elapsed.as_millis(),
|
||||
"compaction iteration complete"
|
||||
);
|
||||
};
|
||||
|
||||
// Perhaps we did no work and the walredo process has been idle for some time:
|
||||
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
|
||||
// TODO: move this to a separate task (housekeeping loop) that isn't affected by the back-off,
|
||||
// so we get some upper bound guarantee on when walredo quiesce / this throttling reporting here happens.
|
||||
if let Some(walredo_mgr) = &tenant.walredo_mgr {
|
||||
walredo_mgr.maybe_quiesce(period * 10);
|
||||
}
|
||||
|
||||
// Sleep
|
||||
if tokio::time::timeout(sleep_duration, cancel.cancelled())
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
break;
|
||||
Err(err) => {
|
||||
error_run += 1;
|
||||
let backoff =
|
||||
exponential_backoff_duration(error_run, BASE_BACKOFF_SECS, MAX_BACKOFF_SECS);
|
||||
log_compaction_error(&err, error_run, backoff, cancel.is_cancelled());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// NB: this log entry is recorded by performance tests.
|
||||
debug!(
|
||||
elapsed_ms = elapsed.as_millis(),
|
||||
"compaction iteration complete"
|
||||
);
|
||||
}
|
||||
.await;
|
||||
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
|
||||
}
|
||||
|
||||
fn log_compaction_error(
|
||||
e: &CompactionError,
|
||||
error_run_count: u32,
|
||||
sleep_duration: &std::time::Duration,
|
||||
err: &CompactionError,
|
||||
error_count: u32,
|
||||
sleep_duration: Duration,
|
||||
task_cancelled: bool,
|
||||
) {
|
||||
use crate::tenant::upload_queue::NotInitialized;
|
||||
use crate::tenant::PageReconstructError;
|
||||
use CompactionError::*;
|
||||
|
||||
enum LooksLike {
|
||||
Info,
|
||||
Error,
|
||||
}
|
||||
let level = match err {
|
||||
ShuttingDown => return,
|
||||
Offload(_) => Level::ERROR,
|
||||
_ if task_cancelled => Level::INFO,
|
||||
Other(err) => {
|
||||
let root_cause = err.root_cause();
|
||||
|
||||
let decision = match e {
|
||||
ShuttingDown => None,
|
||||
Offload(_) => Some(LooksLike::Error),
|
||||
_ if task_cancelled => Some(LooksLike::Info),
|
||||
Other(e) => {
|
||||
let root_cause = e.root_cause();
|
||||
|
||||
let is_stopping = {
|
||||
let upload_queue = root_cause
|
||||
.downcast_ref::<NotInitialized>()
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
|
||||
let timeline = root_cause
|
||||
.downcast_ref::<PageReconstructError>()
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
|
||||
upload_queue || timeline
|
||||
};
|
||||
let upload_queue = root_cause
|
||||
.downcast_ref::<NotInitialized>()
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
let timeline = root_cause
|
||||
.downcast_ref::<PageReconstructError>()
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
let is_stopping = upload_queue || timeline;
|
||||
|
||||
if is_stopping {
|
||||
Some(LooksLike::Info)
|
||||
Level::INFO
|
||||
} else {
|
||||
Some(LooksLike::Error)
|
||||
Level::ERROR
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
match decision {
|
||||
Some(LooksLike::Info) => info!(
|
||||
"Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:#}",
|
||||
),
|
||||
Some(LooksLike::Error) => error!(
|
||||
"Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:?}",
|
||||
),
|
||||
None => {}
|
||||
match level {
|
||||
Level::ERROR => {
|
||||
error!("Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}")
|
||||
}
|
||||
Level::INFO => {
|
||||
info!("Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}")
|
||||
}
|
||||
level => unimplemented!("unexpected level {level:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// GC task's main loop
|
||||
///
|
||||
/// GC task's main loop.
|
||||
async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
const MAX_BACKOFF_SECS: f64 = 300.0;
|
||||
// How many errors we have seen consequtively
|
||||
let mut error_run_count = 0;
|
||||
let mut error_run = 0; // consecutive errors
|
||||
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
async {
|
||||
// GC might require downloading, to find the cutoff LSN that corresponds to the
|
||||
// cutoff specified as time.
|
||||
let ctx =
|
||||
RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
|
||||
// GC might require downloading, to find the cutoff LSN that corresponds to the
|
||||
// cutoff specified as time.
|
||||
let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
|
||||
let mut first = true;
|
||||
|
||||
let mut first = true;
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
return;
|
||||
},
|
||||
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
|
||||
ControlFlow::Break(()) => return,
|
||||
ControlFlow::Continue(()) => (),
|
||||
},
|
||||
}
|
||||
loop {
|
||||
if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
|
||||
return;
|
||||
}
|
||||
|
||||
let period = tenant.get_gc_period();
|
||||
let period = tenant.get_gc_period();
|
||||
|
||||
if first {
|
||||
first = false;
|
||||
|
||||
let delays = async {
|
||||
random_init_delay(period, &cancel).await?;
|
||||
Ok::<_, Cancelled>(())
|
||||
};
|
||||
|
||||
if delays.await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let gc_horizon = tenant.get_gc_horizon();
|
||||
let sleep_duration;
|
||||
if period == Duration::ZERO || gc_horizon == 0 {
|
||||
#[cfg(not(feature = "testing"))]
|
||||
info!("automatic GC is disabled");
|
||||
// check again in 10 seconds, in case it's been enabled again.
|
||||
sleep_duration = Duration::from_secs(10);
|
||||
} else {
|
||||
let iteration = Iteration {
|
||||
started_at: Instant::now(),
|
||||
period,
|
||||
kind: BackgroundLoopKind::Gc,
|
||||
};
|
||||
// Run gc
|
||||
let IterationResult { output, elapsed: _ } =
|
||||
iteration.run(tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &cancel, &ctx))
|
||||
.await;
|
||||
match output {
|
||||
Ok(_) => {
|
||||
error_run_count = 0;
|
||||
sleep_duration = period;
|
||||
}
|
||||
Err(crate::tenant::GcError::TenantCancelled) => {
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
let wait_duration = backoff::exponential_backoff_duration_seconds(
|
||||
error_run_count + 1,
|
||||
1.0,
|
||||
MAX_BACKOFF_SECS,
|
||||
);
|
||||
error_run_count += 1;
|
||||
let wait_duration = Duration::from_secs_f64(wait_duration);
|
||||
|
||||
if matches!(e, crate::tenant::GcError::TimelineCancelled) {
|
||||
// Timeline was cancelled during gc. We might either be in an event
|
||||
// that affects the entire tenant (tenant deletion, pageserver shutdown),
|
||||
// or in one that affects the timeline only (timeline deletion).
|
||||
// Therefore, don't exit the loop.
|
||||
info!("Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}");
|
||||
} else {
|
||||
error!("Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}");
|
||||
}
|
||||
|
||||
sleep_duration = wait_duration;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if tokio::time::timeout(sleep_duration, cancel.cancelled())
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
if first {
|
||||
first = false;
|
||||
if sleep_random(period, &cancel).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
.await;
|
||||
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
|
||||
}
|
||||
|
||||
async fn ingest_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
async {
|
||||
let mut last_throttle_flag_reset_at = Instant::now();
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
return;
|
||||
},
|
||||
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
|
||||
ControlFlow::Break(()) => return,
|
||||
ControlFlow::Continue(()) => (),
|
||||
},
|
||||
}
|
||||
|
||||
// We run ingest housekeeping with the same frequency as compaction: it is not worth
|
||||
// having a distinct setting. But we don't run it in the same task, because compaction
|
||||
// blocks on acquiring the background job semaphore.
|
||||
let period = tenant.get_compaction_period();
|
||||
|
||||
// If compaction period is set to zero (to disable it), then we will use a reasonable default
|
||||
let period = if period == Duration::ZERO {
|
||||
humantime::Duration::from_str(
|
||||
pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD,
|
||||
)
|
||||
.unwrap()
|
||||
.into()
|
||||
} else {
|
||||
period
|
||||
};
|
||||
|
||||
// Jitter the period by +/- 5%
|
||||
let period =
|
||||
rand::thread_rng().gen_range((period * (95)) / 100..(period * (105)) / 100);
|
||||
|
||||
// Always sleep first: we do not need to do ingest housekeeping early in the lifetime of
|
||||
// a tenant, since it won't have started writing any ephemeral files yet.
|
||||
if tokio::time::timeout(period, cancel.cancelled())
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
let gc_horizon = tenant.get_gc_horizon();
|
||||
let sleep_duration;
|
||||
if period == Duration::ZERO || gc_horizon == 0 {
|
||||
#[cfg(not(feature = "testing"))]
|
||||
info!("automatic GC is disabled");
|
||||
// check again in 10 seconds, in case it's been enabled again.
|
||||
sleep_duration = Duration::from_secs(10);
|
||||
} else {
|
||||
let iteration = Iteration {
|
||||
started_at: Instant::now(),
|
||||
period,
|
||||
kind: BackgroundLoopKind::IngestHouseKeeping,
|
||||
kind: BackgroundLoopKind::Gc,
|
||||
};
|
||||
iteration.run(tenant.ingest_housekeeping()).await;
|
||||
|
||||
// TODO: rename the background loop kind to something more generic, like, tenant housekeeping.
|
||||
// Or just spawn another background loop for this throttle, it's not like it's super costly.
|
||||
info_span!(parent: None, "pagestream_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
|
||||
let now = Instant::now();
|
||||
let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
|
||||
let Stats { count_accounted_start, count_accounted_finish, count_throttled, sum_throttled_usecs} = tenant.pagestream_throttle.reset_stats();
|
||||
if count_throttled == 0 {
|
||||
// Run gc
|
||||
let IterationResult { output, elapsed: _ } = iteration
|
||||
.run(tenant.gc_iteration(
|
||||
None,
|
||||
gc_horizon,
|
||||
tenant.get_pitr_interval(),
|
||||
&cancel,
|
||||
&ctx,
|
||||
))
|
||||
.await;
|
||||
match output {
|
||||
Ok(_) => {
|
||||
error_run = 0;
|
||||
sleep_duration = period;
|
||||
}
|
||||
Err(crate::tenant::GcError::TenantCancelled) => {
|
||||
return;
|
||||
}
|
||||
let allowed_rps = tenant.pagestream_throttle.steady_rps();
|
||||
let delta = now - prev;
|
||||
info!(
|
||||
n_seconds=%format_args!("{:.3}", delta.as_secs_f64()),
|
||||
count_accounted = count_accounted_finish, // don't break existing log scraping
|
||||
count_throttled,
|
||||
sum_throttled_usecs,
|
||||
count_accounted_start, // log after pre-existing fields to not break existing log scraping
|
||||
allowed_rps=%format_args!("{allowed_rps:.0}"),
|
||||
"shard was throttled in the last n_seconds"
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
.await;
|
||||
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
|
||||
}
|
||||
Err(e) => {
|
||||
error_run += 1;
|
||||
let wait_duration =
|
||||
exponential_backoff_duration(error_run, 1.0, MAX_BACKOFF_SECS);
|
||||
|
||||
async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
|
||||
// if the tenant has a proper status already, no need to wait for anything
|
||||
if tenant.current_state() == TenantState::Active {
|
||||
ControlFlow::Continue(())
|
||||
} else {
|
||||
let mut tenant_state_updates = tenant.subscribe_for_state_updates();
|
||||
loop {
|
||||
match tenant_state_updates.changed().await {
|
||||
Ok(()) => {
|
||||
let new_state = &*tenant_state_updates.borrow();
|
||||
match new_state {
|
||||
TenantState::Active => {
|
||||
debug!("Tenant state changed to active, continuing the task loop");
|
||||
return ControlFlow::Continue(());
|
||||
}
|
||||
state => {
|
||||
debug!("Not running the task loop, tenant is not active: {state:?}");
|
||||
continue;
|
||||
}
|
||||
if matches!(e, crate::tenant::GcError::TimelineCancelled) {
|
||||
// Timeline was cancelled during gc. We might either be in an event
|
||||
// that affects the entire tenant (tenant deletion, pageserver shutdown),
|
||||
// or in one that affects the timeline only (timeline deletion).
|
||||
// Therefore, don't exit the loop.
|
||||
info!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
|
||||
} else {
|
||||
error!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
|
||||
}
|
||||
}
|
||||
Err(_sender_dropped_error) => {
|
||||
return ControlFlow::Break(());
|
||||
|
||||
sleep_duration = wait_duration;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if tokio::time::timeout(sleep_duration, cancel.cancelled())
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Tenant housekeeping's main loop.
|
||||
async fn tenant_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
let mut last_throttle_flag_reset_at = Instant::now();
|
||||
loop {
|
||||
if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Use the same period as compaction; it's not worth a separate setting. But if it's set to
|
||||
// zero (to disable compaction), then use a reasonable default. Jitter it by 5%.
|
||||
let period = match tenant.get_compaction_period() {
|
||||
Duration::ZERO => humantime::parse_duration(DEFAULT_COMPACTION_PERIOD).unwrap(),
|
||||
period => period,
|
||||
};
|
||||
|
||||
let Ok(period) = sleep_jitter(period, period * 5 / 100, &cancel).await else {
|
||||
break;
|
||||
};
|
||||
|
||||
// Do tenant housekeeping.
|
||||
let iteration = Iteration {
|
||||
started_at: Instant::now(),
|
||||
period,
|
||||
kind: BackgroundLoopKind::TenantHouseKeeping,
|
||||
};
|
||||
iteration.run(tenant.housekeeping()).await;
|
||||
|
||||
// Log any getpage throttling.
|
||||
info_span!(parent: None, "pagestream_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
|
||||
let now = Instant::now();
|
||||
let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
|
||||
let Stats { count_accounted_start, count_accounted_finish, count_throttled, sum_throttled_usecs} = tenant.pagestream_throttle.reset_stats();
|
||||
if count_throttled == 0 {
|
||||
return;
|
||||
}
|
||||
let allowed_rps = tenant.pagestream_throttle.steady_rps();
|
||||
let delta = now - prev;
|
||||
info!(
|
||||
n_seconds=%format_args!("{:.3}", delta.as_secs_f64()),
|
||||
count_accounted = count_accounted_finish, // don't break existing log scraping
|
||||
count_throttled,
|
||||
sum_throttled_usecs,
|
||||
count_accounted_start, // log after pre-existing fields to not break existing log scraping
|
||||
allowed_rps=%format_args!("{allowed_rps:.0}"),
|
||||
"shard was throttled in the last n_seconds"
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Waits until the tenant becomes active, or returns `ControlFlow::Break()` to shut down.
|
||||
async fn wait_for_active_tenant(
|
||||
tenant: &Arc<Tenant>,
|
||||
cancel: &CancellationToken,
|
||||
) -> ControlFlow<()> {
|
||||
if tenant.current_state() == TenantState::Active {
|
||||
return ControlFlow::Continue(());
|
||||
}
|
||||
|
||||
let mut update_rx = tenant.subscribe_for_state_updates();
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => return ControlFlow::Break(()),
|
||||
result = update_rx.changed() => if result.is_err() {
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
}
|
||||
|
||||
match &*update_rx.borrow() {
|
||||
TenantState::Active => {
|
||||
debug!("Tenant state changed to active, continuing the task loop");
|
||||
return ControlFlow::Continue(());
|
||||
}
|
||||
state => debug!("Not running the task loop, tenant is not active: {state:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -558,26 +505,41 @@ async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
|
||||
#[error("cancelled")]
|
||||
pub(crate) struct Cancelled;
|
||||
|
||||
/// Provide a random delay for background task initialization.
|
||||
/// Sleeps for a random interval up to the given max value.
|
||||
///
|
||||
/// This delay prevents a thundering herd of background tasks and will likely keep them running on
|
||||
/// different periods for more stable load.
|
||||
pub(crate) async fn random_init_delay(
|
||||
period: Duration,
|
||||
pub(crate) async fn sleep_random(
|
||||
max: Duration,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), Cancelled> {
|
||||
if period == Duration::ZERO {
|
||||
return Ok(());
|
||||
}
|
||||
) -> Result<Duration, Cancelled> {
|
||||
sleep_random_range(Duration::ZERO..=max, cancel).await
|
||||
}
|
||||
|
||||
let d = {
|
||||
let mut rng = rand::thread_rng();
|
||||
rng.gen_range(Duration::ZERO..=period)
|
||||
};
|
||||
match tokio::time::timeout(d, cancel.cancelled()).await {
|
||||
Ok(_) => Err(Cancelled),
|
||||
Err(_) => Ok(()),
|
||||
/// Sleeps for a random interval in the given range. Returns the duration.
|
||||
pub(crate) async fn sleep_random_range(
|
||||
interval: RangeInclusive<Duration>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Duration, Cancelled> {
|
||||
let delay = rand::thread_rng().gen_range(interval);
|
||||
if delay == Duration::ZERO {
|
||||
return Ok(delay);
|
||||
}
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => Err(Cancelled),
|
||||
_ = tokio::time::sleep(delay) => Ok(delay),
|
||||
}
|
||||
}
|
||||
|
||||
/// Sleeps for an interval with a random jitter.
|
||||
pub(crate) async fn sleep_jitter(
|
||||
duration: Duration,
|
||||
jitter: Duration,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Duration, Cancelled> {
|
||||
let from = duration.saturating_sub(jitter);
|
||||
let to = duration.saturating_add(jitter);
|
||||
sleep_random_range(from..=to, cancel).await
|
||||
}
|
||||
|
||||
struct Iteration {
|
||||
@@ -593,42 +555,25 @@ struct IterationResult<O> {
|
||||
|
||||
impl Iteration {
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn run<Fut, O>(self, fut: Fut) -> IterationResult<O>
|
||||
where
|
||||
Fut: std::future::Future<Output = O>,
|
||||
{
|
||||
let Self {
|
||||
started_at,
|
||||
period,
|
||||
kind,
|
||||
} = self;
|
||||
|
||||
let mut fut = std::pin::pin!(fut);
|
||||
pub(crate) async fn run<F: Future<Output = O>, O>(self, fut: F) -> IterationResult<O> {
|
||||
let mut fut = pin!(fut);
|
||||
|
||||
// Wrap `fut` into a future that logs a message every `period` so that we get a
|
||||
// very obvious breadcrumb in the logs _while_ a slow iteration is happening.
|
||||
let liveness_logger = async move {
|
||||
loop {
|
||||
match tokio::time::timeout(period, &mut fut).await {
|
||||
Ok(x) => return x,
|
||||
Err(_) => {
|
||||
// info level as per the same rationale why warn_when_period_overrun is info
|
||||
// => https://github.com/neondatabase/neon/pull/5724
|
||||
info!("still running");
|
||||
}
|
||||
}
|
||||
let output = loop {
|
||||
match tokio::time::timeout(self.period, &mut fut).await {
|
||||
Ok(r) => break r,
|
||||
Err(_) => info!("still running"),
|
||||
}
|
||||
};
|
||||
|
||||
let output = liveness_logger.await;
|
||||
|
||||
let elapsed = started_at.elapsed();
|
||||
warn_when_period_overrun(elapsed, period, kind);
|
||||
let elapsed = self.started_at.elapsed();
|
||||
warn_when_period_overrun(elapsed, self.period, self.kind);
|
||||
|
||||
IterationResult { output, elapsed }
|
||||
}
|
||||
}
|
||||
/// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric.
|
||||
|
||||
// NB: the `task` and `period` are used for metrics labels.
|
||||
pub(crate) fn warn_when_period_overrun(
|
||||
elapsed: Duration,
|
||||
period: Duration,
|
||||
|
||||
@@ -45,11 +45,9 @@ use rand::Rng;
|
||||
use remote_storage::DownloadError;
|
||||
use serde_with::serde_as;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::{
|
||||
runtime::Handle,
|
||||
sync::{oneshot, watch},
|
||||
};
|
||||
use tokio::sync::{oneshot, watch, Notify};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::critical;
|
||||
@@ -227,6 +225,7 @@ pub struct TimelineResources {
|
||||
pub remote_client: RemoteTimelineClient,
|
||||
pub pagestream_throttle: Arc<crate::tenant::throttle::Throttle>,
|
||||
pub pagestream_throttle_metrics: Arc<crate::metrics::tenant_throttling::Pagestream>,
|
||||
pub l0_compaction_trigger: Arc<Notify>,
|
||||
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
|
||||
}
|
||||
|
||||
@@ -426,6 +425,9 @@ pub struct Timeline {
|
||||
/// If true, the last compaction failed.
|
||||
compaction_failed: AtomicBool,
|
||||
|
||||
/// Notifies the tenant compaction loop that there is pending L0 compaction work.
|
||||
l0_compaction_trigger: Arc<Notify>,
|
||||
|
||||
/// Make sure we only have one running gc at a time.
|
||||
///
|
||||
/// Must only be taken in two places:
|
||||
@@ -626,6 +628,71 @@ impl From<layer_manager::Shutdown> for GetVectoredError {
|
||||
}
|
||||
}
|
||||
|
||||
/// A layer identifier when used in the [`ReadPath`] structure. This enum is for observability purposes
|
||||
/// only and not used by the "real read path".
|
||||
pub enum ReadPathLayerId {
|
||||
PersistentLayer(PersistentLayerKey),
|
||||
InMemoryLayer(Range<Lsn>),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ReadPathLayerId {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
ReadPathLayerId::PersistentLayer(key) => write!(f, "{}", key),
|
||||
ReadPathLayerId::InMemoryLayer(range) => {
|
||||
write!(f, "in-mem {}..{}", range.start, range.end)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
pub struct ReadPath {
|
||||
keyspace: KeySpace,
|
||||
lsn: Lsn,
|
||||
path: Vec<(ReadPathLayerId, KeySpace, Range<Lsn>)>,
|
||||
}
|
||||
|
||||
impl ReadPath {
|
||||
pub fn new(keyspace: KeySpace, lsn: Lsn) -> Self {
|
||||
Self {
|
||||
keyspace,
|
||||
lsn,
|
||||
path: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn record_layer_visit(
|
||||
&mut self,
|
||||
layer_to_read: &ReadableLayer,
|
||||
keyspace_to_read: &KeySpace,
|
||||
lsn_range: &Range<Lsn>,
|
||||
) {
|
||||
let id = match layer_to_read {
|
||||
ReadableLayer::PersistentLayer(layer) => {
|
||||
ReadPathLayerId::PersistentLayer(layer.layer_desc().key())
|
||||
}
|
||||
ReadableLayer::InMemoryLayer(layer) => {
|
||||
ReadPathLayerId::InMemoryLayer(layer.get_lsn_range())
|
||||
}
|
||||
};
|
||||
self.path
|
||||
.push((id, keyspace_to_read.clone(), lsn_range.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ReadPath {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
writeln!(f, "Read path for {} at lsn {}:", self.keyspace, self.lsn)?;
|
||||
for (idx, (layer_id, keyspace, lsn_range)) in self.path.iter().enumerate() {
|
||||
writeln!(
|
||||
f,
|
||||
"{}: {} {}..{} {}",
|
||||
idx, layer_id, lsn_range.start, lsn_range.end, keyspace
|
||||
)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error)]
|
||||
pub struct MissingKeyError {
|
||||
key: Key,
|
||||
@@ -633,6 +700,8 @@ pub struct MissingKeyError {
|
||||
cont_lsn: Lsn,
|
||||
request_lsn: Lsn,
|
||||
ancestor_lsn: Option<Lsn>,
|
||||
/// Debug information about the read path if there's an error
|
||||
read_path: Option<ReadPath>,
|
||||
backtrace: Option<std::backtrace::Backtrace>,
|
||||
}
|
||||
|
||||
@@ -649,10 +718,15 @@ impl std::fmt::Display for MissingKeyError {
|
||||
"could not find data for key {} (shard {:?}) at LSN {}, request LSN {}",
|
||||
self.key, self.shard, self.cont_lsn, self.request_lsn
|
||||
)?;
|
||||
|
||||
if let Some(ref ancestor_lsn) = self.ancestor_lsn {
|
||||
write!(f, ", ancestor {}", ancestor_lsn)?;
|
||||
}
|
||||
|
||||
if let Some(ref read_path) = self.read_path {
|
||||
write!(f, "\n{}", read_path)?;
|
||||
}
|
||||
|
||||
if let Some(ref backtrace) = self.backtrace {
|
||||
write!(f, "\n{}", backtrace)?;
|
||||
}
|
||||
@@ -1069,6 +1143,7 @@ impl Timeline {
|
||||
request_lsn: lsn,
|
||||
ancestor_lsn: None,
|
||||
backtrace: None,
|
||||
read_path: None,
|
||||
})),
|
||||
}
|
||||
}
|
||||
@@ -1195,6 +1270,13 @@ impl Timeline {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
let read_path = if self.conf.enable_read_path_debugging {
|
||||
Some(ReadPath::new(keyspace.clone(), lsn))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
reconstruct_state.read_path = read_path;
|
||||
|
||||
let traversal_res: Result<(), _> = self
|
||||
.get_vectored_reconstruct_data(keyspace.clone(), lsn, reconstruct_state, ctx)
|
||||
.await;
|
||||
@@ -1718,8 +1800,9 @@ impl Timeline {
|
||||
let prepare = async move {
|
||||
let guard = self.compaction_lock.lock().await;
|
||||
|
||||
let permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
|
||||
let permit = super::tasks::acquire_concurrency_permit(
|
||||
BackgroundLoopKind::Compaction,
|
||||
self.conf.use_compaction_semaphore,
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
@@ -2583,6 +2666,7 @@ impl Timeline {
|
||||
|
||||
compaction_lock: tokio::sync::Mutex::default(),
|
||||
compaction_failed: AtomicBool::default(),
|
||||
l0_compaction_trigger: resources.l0_compaction_trigger,
|
||||
gc_lock: tokio::sync::Mutex::default(),
|
||||
|
||||
standby_horizon: AtomicLsn::new(0),
|
||||
@@ -2632,7 +2716,7 @@ impl Timeline {
|
||||
return;
|
||||
}
|
||||
FlushLoopState::Exited => {
|
||||
warn!(
|
||||
info!(
|
||||
"ignoring attempt to restart exited flush_loop {}/{}",
|
||||
self.tenant_shard_id, self.timeline_id
|
||||
);
|
||||
@@ -3056,8 +3140,9 @@ impl Timeline {
|
||||
let self_ref = &self;
|
||||
let skip_concurrency_limiter = &skip_concurrency_limiter;
|
||||
async move {
|
||||
let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
|
||||
let wait_for_permit = super::tasks::acquire_concurrency_permit(
|
||||
BackgroundLoopKind::InitialLogicalSizeCalculation,
|
||||
false,
|
||||
background_ctx,
|
||||
);
|
||||
|
||||
@@ -3502,6 +3587,7 @@ impl Timeline {
|
||||
request_lsn,
|
||||
ancestor_lsn: Some(timeline.ancestor_lsn),
|
||||
backtrace: None,
|
||||
read_path: std::mem::take(&mut reconstruct_state.read_path),
|
||||
}));
|
||||
}
|
||||
|
||||
@@ -3620,6 +3706,9 @@ impl Timeline {
|
||||
}
|
||||
|
||||
if let Some((layer_to_read, keyspace_to_read, lsn_range)) = fringe.next_layer() {
|
||||
if let Some(ref mut read_path) = reconstruct_state.read_path {
|
||||
read_path.record_layer_visit(&layer_to_read, &keyspace_to_read, &lsn_range);
|
||||
}
|
||||
let next_cont_lsn = lsn_range.start;
|
||||
layer_to_read
|
||||
.get_values_reconstruct_data(
|
||||
@@ -3920,6 +4009,12 @@ impl Timeline {
|
||||
}
|
||||
let flush_duration = flush_timer.stop_and_record();
|
||||
|
||||
// Notify the tenant compaction loop if L0 compaction is needed.
|
||||
let l0_count = *watch_l0.borrow();
|
||||
if l0_count >= self.get_compaction_threshold() {
|
||||
self.l0_compaction_trigger.notify_one();
|
||||
}
|
||||
|
||||
// Delay the next flush to backpressure if compaction can't keep up. We delay by the
|
||||
// flush duration such that the flush takes 2x as long. This is propagated up to WAL
|
||||
// ingestion by having ephemeral layer rolls wait for flushes.
|
||||
|
||||
@@ -687,6 +687,20 @@ impl Timeline {
|
||||
|
||||
// Define partitioning schema if needed
|
||||
|
||||
let l0_l1_boundary_lsn = {
|
||||
// We do the repartition on the L0-L1 boundary. All data below the boundary
|
||||
// are compacted by L0 with low read amplification, thus making the `repartition`
|
||||
// function run fast.
|
||||
let guard = self.layers.read().await;
|
||||
let l0_min_lsn = guard
|
||||
.layer_map()?
|
||||
.level0_deltas()
|
||||
.iter()
|
||||
.map(|l| l.get_lsn_range().start)
|
||||
.min()
|
||||
.unwrap_or(self.get_disk_consistent_lsn());
|
||||
l0_min_lsn.max(self.get_ancestor_lsn())
|
||||
};
|
||||
// 1. L0 Compact
|
||||
let l0_compaction_outcome = {
|
||||
let timer = self.metrics.compact_time_histo.start_timer();
|
||||
@@ -709,80 +723,87 @@ impl Timeline {
|
||||
return Ok(CompactionOutcome::Pending);
|
||||
}
|
||||
|
||||
// 2. Repartition and create image layers if necessary
|
||||
let partition_count = match self
|
||||
.repartition(
|
||||
self.get_last_record_lsn(), // TODO: use L0-L1 boundary
|
||||
self.get_compaction_target_size(),
|
||||
options.flags,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
|
||||
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
|
||||
let image_ctx = RequestContextBuilder::extend(ctx)
|
||||
.access_stats_behavior(AccessStatsBehavior::Skip)
|
||||
.build();
|
||||
if l0_l1_boundary_lsn < self.partitioning.read().1 {
|
||||
// We never go backwards when repartition and create image layers.
|
||||
info!("skipping image layer generation because repartition LSN is greater than L0-L1 boundary LSN.");
|
||||
} else {
|
||||
// 2. Repartition and create image layers if necessary
|
||||
match self
|
||||
.repartition(
|
||||
l0_l1_boundary_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
options.flags,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
|
||||
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
|
||||
let image_ctx = RequestContextBuilder::extend(ctx)
|
||||
.access_stats_behavior(AccessStatsBehavior::Skip)
|
||||
.build();
|
||||
|
||||
let mut partitioning = dense_partitioning;
|
||||
partitioning
|
||||
.parts
|
||||
.extend(sparse_partitioning.into_dense().parts);
|
||||
let mut partitioning = dense_partitioning;
|
||||
partitioning
|
||||
.parts
|
||||
.extend(sparse_partitioning.into_dense().parts);
|
||||
|
||||
// 3. Create new image layers for partitions that have been modified "enough".
|
||||
let (image_layers, outcome) = self
|
||||
.create_image_layers(
|
||||
&partitioning,
|
||||
lsn,
|
||||
if options
|
||||
.flags
|
||||
.contains(CompactFlags::ForceImageLayerCreation)
|
||||
{
|
||||
ImageLayerCreationMode::Force
|
||||
} else {
|
||||
ImageLayerCreationMode::Try
|
||||
},
|
||||
&image_ctx,
|
||||
self.last_image_layer_creation_status
|
||||
.load()
|
||||
.as_ref()
|
||||
.clone(),
|
||||
)
|
||||
.await
|
||||
.inspect_err(|err| {
|
||||
if let CreateImageLayersError::GetVectoredError(
|
||||
GetVectoredError::MissingKey(_),
|
||||
) = err
|
||||
{
|
||||
critical!("missing key during compaction: {err:?}");
|
||||
}
|
||||
})?;
|
||||
// 3. Create new image layers for partitions that have been modified "enough".
|
||||
let (image_layers, outcome) = self
|
||||
.create_image_layers(
|
||||
&partitioning,
|
||||
lsn,
|
||||
if options
|
||||
.flags
|
||||
.contains(CompactFlags::ForceImageLayerCreation)
|
||||
{
|
||||
ImageLayerCreationMode::Force
|
||||
} else {
|
||||
ImageLayerCreationMode::Try
|
||||
},
|
||||
&image_ctx,
|
||||
self.last_image_layer_creation_status
|
||||
.load()
|
||||
.as_ref()
|
||||
.clone(),
|
||||
)
|
||||
.await
|
||||
.inspect_err(|err| {
|
||||
if let CreateImageLayersError::GetVectoredError(
|
||||
GetVectoredError::MissingKey(_),
|
||||
) = err
|
||||
{
|
||||
critical!("missing key during compaction: {err:?}");
|
||||
}
|
||||
})?;
|
||||
|
||||
self.last_image_layer_creation_status
|
||||
.store(Arc::new(outcome.clone()));
|
||||
self.last_image_layer_creation_status
|
||||
.store(Arc::new(outcome.clone()));
|
||||
|
||||
self.upload_new_image_layers(image_layers)?;
|
||||
if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
|
||||
// Yield and do not do any other kind of compaction.
|
||||
info!("skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction).");
|
||||
return Ok(CompactionOutcome::Pending);
|
||||
self.upload_new_image_layers(image_layers)?;
|
||||
if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
|
||||
// Yield and do not do any other kind of compaction.
|
||||
info!("skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction).");
|
||||
return Ok(CompactionOutcome::Pending);
|
||||
}
|
||||
}
|
||||
partitioning.parts.len()
|
||||
}
|
||||
Err(err) => {
|
||||
// no partitioning? This is normal, if the timeline was just created
|
||||
// as an empty timeline. Also in unit tests, when we use the timeline
|
||||
// as a simple key-value store, ignoring the datadir layout. Log the
|
||||
// error but continue.
|
||||
//
|
||||
// Suppress error when it's due to cancellation
|
||||
if !self.cancel.is_cancelled() && !err.is_cancelled() {
|
||||
tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
|
||||
Err(err) => {
|
||||
// no partitioning? This is normal, if the timeline was just created
|
||||
// as an empty timeline. Also in unit tests, when we use the timeline
|
||||
// as a simple key-value store, ignoring the datadir layout. Log the
|
||||
// error but continue.
|
||||
//
|
||||
// Suppress error when it's due to cancellation
|
||||
if !self.cancel.is_cancelled() && !err.is_cancelled() {
|
||||
tracing::error!(
|
||||
"could not compact, repartitioning keyspace failed: {err:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
1
|
||||
}
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
let partition_count = self.partitioning.read().0 .0.parts.len();
|
||||
|
||||
// 4. Shard ancestor compaction
|
||||
|
||||
@@ -2238,8 +2259,11 @@ impl Timeline {
|
||||
split_key_ranges.push((start, end));
|
||||
}
|
||||
split_key_ranges.sort();
|
||||
let guard = self.layers.read().await;
|
||||
let layer_map = guard.layer_map()?;
|
||||
let all_layers = {
|
||||
let guard = self.layers.read().await;
|
||||
let layer_map = guard.layer_map()?;
|
||||
layer_map.iter_historic_layers().collect_vec()
|
||||
};
|
||||
let mut current_start = None;
|
||||
let ranges_num = split_key_ranges.len();
|
||||
for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
|
||||
@@ -2251,14 +2275,23 @@ impl Timeline {
|
||||
// We have already processed this partition.
|
||||
continue;
|
||||
}
|
||||
let res = layer_map.range_search(start..end, compact_below_lsn);
|
||||
let total_size = res.found.keys().map(|x| x.layer.file_size()).sum::<u64>();
|
||||
let overlapping_layers = {
|
||||
let mut desc = Vec::new();
|
||||
for layer in all_layers.iter() {
|
||||
if overlaps_with(&layer.get_key_range(), &(start..end))
|
||||
&& layer.get_lsn_range().start <= compact_below_lsn
|
||||
{
|
||||
desc.push(layer.clone());
|
||||
}
|
||||
}
|
||||
desc
|
||||
};
|
||||
let total_size = overlapping_layers.iter().map(|x| x.file_size).sum::<u64>();
|
||||
if total_size > sub_compaction_max_job_size_mb * 1024 * 1024 || ranges_num == idx + 1 {
|
||||
// Try to extend the compaction range so that we include at least one full layer file.
|
||||
let extended_end = res
|
||||
.found
|
||||
.keys()
|
||||
.map(|layer| layer.layer.key_range.end)
|
||||
let extended_end = overlapping_layers
|
||||
.iter()
|
||||
.map(|layer| layer.key_range.end)
|
||||
.min();
|
||||
// It is possible that the search range does not contain any layer files when we reach the end of the loop.
|
||||
// In this case, we simply use the specified key range end.
|
||||
@@ -2285,7 +2318,6 @@ impl Timeline {
|
||||
current_start = Some(end);
|
||||
}
|
||||
}
|
||||
drop(guard);
|
||||
Ok(compact_jobs)
|
||||
}
|
||||
|
||||
|
||||
@@ -17,13 +17,11 @@ use crate::{
|
||||
metadata::TimelineMetadata,
|
||||
remote_timeline_client::{PersistIndexPartWithDeletedFlagError, RemoteTimelineClient},
|
||||
CreateTimelineCause, DeleteTimelineError, MaybeDeletedIndexPart, Tenant,
|
||||
TenantManifestError, TimelineOrOffloaded,
|
||||
TenantManifestError, Timeline, TimelineOrOffloaded,
|
||||
},
|
||||
virtual_file::MaybeFatalIo,
|
||||
};
|
||||
|
||||
use super::{Timeline, TimelineResources};
|
||||
|
||||
/// Mark timeline as deleted in S3 so we won't pick it up next time
|
||||
/// during attach or pageserver restart.
|
||||
/// See comment in persist_index_part_with_deleted_flag.
|
||||
@@ -296,12 +294,7 @@ impl DeleteTimelineFlow {
|
||||
timeline_id,
|
||||
local_metadata,
|
||||
None, // Ancestor is not needed for deletion.
|
||||
TimelineResources {
|
||||
remote_client,
|
||||
pagestream_throttle: tenant.pagestream_throttle.clone(),
|
||||
pagestream_throttle_metrics: tenant.pagestream_throttle_metrics.clone(),
|
||||
l0_flush_global_state: tenant.l0_flush_global_state.clone(),
|
||||
},
|
||||
tenant.get_timeline_resources_for(remote_client),
|
||||
// Important. We dont pass ancestor above because it can be missing.
|
||||
// Thus we need to skip the validation here.
|
||||
CreateTimelineCause::Delete,
|
||||
@@ -341,6 +334,13 @@ impl DeleteTimelineFlow {
|
||||
let tenant_shard_id = timeline.tenant_shard_id();
|
||||
let timeline_id = timeline.timeline_id();
|
||||
|
||||
// Take a tenant gate guard, because timeline deletion needs access to the tenant to update its manifest.
|
||||
let Ok(tenant_guard) = tenant.gate.enter() else {
|
||||
// It is safe to simply skip here, because we only schedule background work once the timeline is durably marked for deletion.
|
||||
info!("Tenant is shutting down, timeline deletion will be resumed when it next starts");
|
||||
return;
|
||||
};
|
||||
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::TimelineDeletionWorker,
|
||||
@@ -348,6 +348,8 @@ impl DeleteTimelineFlow {
|
||||
Some(timeline_id),
|
||||
"timeline_delete",
|
||||
async move {
|
||||
let _guard = tenant_guard;
|
||||
|
||||
if let Err(err) = Self::background(guard, conf, &tenant, &timeline, remote_client).await {
|
||||
// Only log as an error if it's not a cancellation.
|
||||
if matches!(err, DeleteTimelineError::Cancelled) {
|
||||
|
||||
@@ -32,7 +32,7 @@ use crate::{
|
||||
tenant::{
|
||||
size::CalculateSyntheticSizeError,
|
||||
storage_layer::LayerVisibilityHint,
|
||||
tasks::{BackgroundLoopKind, BackgroundLoopSemaphorePermit},
|
||||
tasks::{sleep_random, BackgroundLoopKind, BackgroundLoopSemaphorePermit},
|
||||
timeline::EvictionError,
|
||||
LogicalSizeCalculationCause, Tenant,
|
||||
},
|
||||
@@ -83,8 +83,6 @@ impl Timeline {
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
|
||||
async fn eviction_task(self: Arc<Self>, tenant: Arc<Tenant>) {
|
||||
use crate::tenant::tasks::random_init_delay;
|
||||
|
||||
// acquire the gate guard only once within a useful span
|
||||
let Ok(guard) = self.gate.enter() else {
|
||||
return;
|
||||
@@ -97,7 +95,7 @@ impl Timeline {
|
||||
EvictionPolicy::OnlyImitiate(lat) => lat.period,
|
||||
EvictionPolicy::NoEviction => Duration::from_secs(10),
|
||||
};
|
||||
if random_init_delay(period, &self.cancel).await.is_err() {
|
||||
if sleep_random(period, &self.cancel).await.is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -334,8 +332,9 @@ impl Timeline {
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> ControlFlow<(), BackgroundLoopSemaphorePermit<'static>> {
|
||||
let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit(
|
||||
let acquire_permit = crate::tenant::tasks::acquire_concurrency_permit(
|
||||
BackgroundLoopKind::Eviction,
|
||||
false,
|
||||
ctx,
|
||||
);
|
||||
|
||||
|
||||
@@ -37,8 +37,8 @@ To play with it locally one may start proxy over a local postgres installation
|
||||
|
||||
If both postgres and proxy are running you may send a SQL query:
|
||||
```console
|
||||
curl -k -X POST 'https://proxy.localtest.me:4444/sql' \
|
||||
-H 'Neon-Connection-String: postgres://stas:pass@proxy.localtest.me:4444/postgres' \
|
||||
curl -k -X POST 'https://proxy.local.neon.build:4444/sql' \
|
||||
-H 'Neon-Connection-String: postgres://stas:pass@proxy.local.neon.build:4444/postgres' \
|
||||
-H 'Content-Type: application/json' \
|
||||
--data '{
|
||||
"query":"SELECT $1::int[] as arr, $2::jsonb as obj, 42 as num",
|
||||
@@ -104,7 +104,7 @@ cases where it is hard to use rows represented as objects (e.g. when several fie
|
||||
|
||||
## Test proxy locally
|
||||
|
||||
Proxy determines project name from the subdomain, request to the `round-rice-566201.somedomain.tld` will be routed to the project named `round-rice-566201`. Unfortunately, `/etc/hosts` does not support domain wildcards, so we can use *.localtest.me` which resolves to `127.0.0.1`.
|
||||
Proxy determines project name from the subdomain, request to the `round-rice-566201.somedomain.tld` will be routed to the project named `round-rice-566201`. Unfortunately, `/etc/hosts` does not support domain wildcards, so we can use *.local.neon.build` which resolves to `127.0.0.1`.
|
||||
|
||||
We will need to have a postgres instance. Assuming that we have set up docker we can set it up as follows:
|
||||
```sh
|
||||
@@ -125,7 +125,7 @@ docker exec -it proxy-postgres psql -U postgres -c "CREATE ROLE proxy WITH SUPER
|
||||
|
||||
Let's create self-signed certificate by running:
|
||||
```sh
|
||||
openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key -subj "/CN=*.localtest.me"
|
||||
openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key -subj "/CN=*.local.neon.build"
|
||||
```
|
||||
|
||||
Then we need to build proxy with 'testing' feature and run, e.g.:
|
||||
@@ -136,5 +136,5 @@ RUST_LOG=proxy cargo run -p proxy --bin proxy --features testing -- --auth-backe
|
||||
Now from client you can start a new session:
|
||||
|
||||
```sh
|
||||
PGSSLROOTCERT=./server.crt psql "postgresql://proxy:password@endpoint.localtest.me:4432/postgres?sslmode=verify-full"
|
||||
PGSSLROOTCERT=./server.crt psql "postgresql://proxy:password@endpoint.local.neon.build:4432/postgres?sslmode=verify-full"
|
||||
```
|
||||
|
||||
@@ -108,6 +108,10 @@ impl<T> Backend<'_, T> {
|
||||
Self::Local(_) => panic!("Local backend has no API"),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn is_local_proxy(&self) -> bool {
|
||||
matches!(self, Self::Local(_))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> Backend<'a, T> {
|
||||
|
||||
@@ -69,17 +69,35 @@ pub async fn handle_cancel_messages(
|
||||
value,
|
||||
resp_tx,
|
||||
_guard,
|
||||
expire: _,
|
||||
expire,
|
||||
} => {
|
||||
let res = client.hset(&key, field, value).await;
|
||||
if let Some(resp_tx) = resp_tx {
|
||||
resp_tx
|
||||
.send(client.hset(key, field, value).await)
|
||||
.inspect_err(|e| {
|
||||
tracing::debug!("failed to send StoreCancelKey response: {:?}", e);
|
||||
})
|
||||
.ok();
|
||||
if res.is_ok() {
|
||||
resp_tx
|
||||
.send(client.expire(key, expire).await)
|
||||
.inspect_err(|e| {
|
||||
tracing::debug!(
|
||||
"failed to send StoreCancelKey response: {:?}",
|
||||
e
|
||||
);
|
||||
})
|
||||
.ok();
|
||||
} else {
|
||||
resp_tx
|
||||
.send(res)
|
||||
.inspect_err(|e| {
|
||||
tracing::debug!(
|
||||
"failed to send StoreCancelKey response: {:?}",
|
||||
e
|
||||
);
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
} else if res.is_ok() {
|
||||
drop(client.expire(key, expire).await);
|
||||
} else {
|
||||
drop(client.hset(key, field, value).await);
|
||||
tracing::warn!("failed to store cancel key: {:?}", res);
|
||||
}
|
||||
}
|
||||
CancelKeyOp::GetCancelData {
|
||||
@@ -436,7 +454,7 @@ impl Session {
|
||||
&self.key
|
||||
}
|
||||
|
||||
// Send the store key op to the cancellation handler
|
||||
// Send the store key op to the cancellation handler and set TTL for the key
|
||||
pub(crate) async fn write_cancel_key(
|
||||
&self,
|
||||
cancel_closure: CancelClosure,
|
||||
|
||||
@@ -187,6 +187,10 @@ pub async fn worker(
|
||||
let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
|
||||
let rx = rx.map(RequestData::from);
|
||||
|
||||
let storage = GenericRemoteStorage::from_config(&remote_storage_config)
|
||||
.await
|
||||
.context("remote storage init")?;
|
||||
|
||||
let properties = WriterProperties::builder()
|
||||
.set_data_page_size_limit(config.parquet_upload_page_size)
|
||||
.set_compression(config.parquet_upload_compression);
|
||||
@@ -220,18 +224,18 @@ pub async fn worker(
|
||||
let rx_disconnect = futures::stream::poll_fn(move |cx| rx_disconnect.poll_recv(cx));
|
||||
let rx_disconnect = rx_disconnect.map(RequestData::from);
|
||||
|
||||
let storage_disconnect =
|
||||
GenericRemoteStorage::from_config(&disconnect_events_storage_config)
|
||||
.await
|
||||
.context("remote storage for disconnect events init")?;
|
||||
let parquet_config_disconnect = parquet_config.clone();
|
||||
tokio::try_join!(
|
||||
worker_inner(remote_storage_config, rx, parquet_config),
|
||||
worker_inner(
|
||||
disconnect_events_storage_config,
|
||||
rx_disconnect,
|
||||
parquet_config_disconnect
|
||||
)
|
||||
worker_inner(storage, rx, parquet_config),
|
||||
worker_inner(storage_disconnect, rx_disconnect, parquet_config_disconnect)
|
||||
)
|
||||
.map(|_| ())
|
||||
} else {
|
||||
worker_inner(remote_storage_config, rx, parquet_config).await
|
||||
worker_inner(storage, rx, parquet_config).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -247,32 +251,18 @@ struct ParquetConfig {
|
||||
test_remote_failures: u64,
|
||||
}
|
||||
|
||||
impl ParquetConfig {
|
||||
async fn storage(
|
||||
&self,
|
||||
storage_config: &RemoteStorageConfig,
|
||||
) -> anyhow::Result<GenericRemoteStorage> {
|
||||
let storage = GenericRemoteStorage::from_config(storage_config)
|
||||
.await
|
||||
.context("remote storage init")?;
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
if self.test_remote_failures > 0 {
|
||||
return Ok(GenericRemoteStorage::unreliable_wrapper(
|
||||
storage,
|
||||
self.test_remote_failures,
|
||||
));
|
||||
}
|
||||
|
||||
Ok(storage)
|
||||
}
|
||||
}
|
||||
|
||||
async fn worker_inner(
|
||||
storage_config: RemoteStorageConfig,
|
||||
storage: GenericRemoteStorage,
|
||||
rx: impl Stream<Item = RequestData>,
|
||||
config: ParquetConfig,
|
||||
) -> anyhow::Result<()> {
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
let storage = if config.test_remote_failures > 0 {
|
||||
GenericRemoteStorage::unreliable_wrapper(storage, config.test_remote_failures)
|
||||
} else {
|
||||
storage
|
||||
};
|
||||
|
||||
let mut rx = std::pin::pin!(rx);
|
||||
|
||||
let mut rows = Vec::with_capacity(config.rows_per_group);
|
||||
@@ -295,7 +285,7 @@ async fn worker_inner(
|
||||
}
|
||||
if len > config.file_size || force {
|
||||
last_upload = time::Instant::now();
|
||||
let file = upload_parquet(w, len, &storage_config, &config).await?;
|
||||
let file = upload_parquet(w, len, &storage).await?;
|
||||
w = SerializedFileWriter::new(file, schema.clone(), config.propeties.clone())?;
|
||||
len = 0;
|
||||
}
|
||||
@@ -308,7 +298,7 @@ async fn worker_inner(
|
||||
}
|
||||
|
||||
if !w.flushed_row_groups().is_empty() {
|
||||
let _rtchk: Writer<BytesMut> = upload_parquet(w, len, &storage_config, &config).await?;
|
||||
let _rtchk: Writer<BytesMut> = upload_parquet(w, len, &storage).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -350,8 +340,7 @@ where
|
||||
async fn upload_parquet(
|
||||
mut w: SerializedFileWriter<Writer<BytesMut>>,
|
||||
len: i64,
|
||||
storage_config: &RemoteStorageConfig,
|
||||
config: &ParquetConfig,
|
||||
storage: &GenericRemoteStorage,
|
||||
) -> anyhow::Result<Writer<BytesMut>> {
|
||||
let len_uncompressed = w
|
||||
.flushed_row_groups()
|
||||
@@ -388,15 +377,6 @@ async fn upload_parquet(
|
||||
size, compression, "uploading request parquet file"
|
||||
);
|
||||
|
||||
// A bug in azure-sdk means that the identity-token-file that expires after
|
||||
// 1 hour is not refreshed. This identity-token is used to fetch the actual azure storage
|
||||
// tokens that last for 24 hours. After this 24 hour period, azure-sdk tries to refresh
|
||||
// the storage token, but the identity token has now expired.
|
||||
// <https://github.com/Azure/azure-sdk-for-rust/issues/1739>
|
||||
//
|
||||
// To work around this, we recreate the storage every time.
|
||||
let storage = config.storage(storage_config).await?;
|
||||
|
||||
let year = now.year();
|
||||
let month = now.month();
|
||||
let day = now.day();
|
||||
@@ -451,8 +431,8 @@ mod tests {
|
||||
use rand::rngs::StdRng;
|
||||
use rand::{Rng, SeedableRng};
|
||||
use remote_storage::{
|
||||
RemoteStorageConfig, RemoteStorageKind, S3Config, DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
|
||||
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
|
||||
GenericRemoteStorage, RemoteStorageConfig, RemoteStorageKind, S3Config,
|
||||
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
|
||||
};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time;
|
||||
@@ -579,11 +559,12 @@ mod tests {
|
||||
timeout: std::time::Duration::from_secs(120),
|
||||
small_timeout: std::time::Duration::from_secs(30),
|
||||
};
|
||||
|
||||
worker_inner(remote_storage_config, rx, config)
|
||||
let storage = GenericRemoteStorage::from_config(&remote_storage_config)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
worker_inner(storage, rx, config).await.unwrap();
|
||||
|
||||
let mut files = WalkDir::new(tmpdir.as_std_path())
|
||||
.into_iter()
|
||||
.filter_map(|entry| entry.ok())
|
||||
|
||||
@@ -400,9 +400,9 @@ fn create_random_jwk() -> (SigningKey, jose_jwk::Key) {
|
||||
pub(crate) enum HttpConnError {
|
||||
#[error("pooled connection closed at inconsistent state")]
|
||||
ConnectionClosedAbruptly(#[from] tokio::sync::watch::error::SendError<uuid::Uuid>),
|
||||
#[error("could not connection to postgres in compute")]
|
||||
#[error("could not connect to postgres in compute")]
|
||||
PostgresConnectionError(#[from] postgres_client::Error),
|
||||
#[error("could not connection to local-proxy in compute")]
|
||||
#[error("could not connect to local-proxy in compute")]
|
||||
LocalProxyConnectionError(#[from] LocalProxyConnError),
|
||||
#[error("could not parse JWT payload")]
|
||||
JwtPayloadError(serde_json::Error),
|
||||
|
||||
@@ -11,10 +11,12 @@ use http_body_util::{BodyExt, Full};
|
||||
use hyper::body::Incoming;
|
||||
use hyper::http::{HeaderName, HeaderValue};
|
||||
use hyper::{header, HeaderMap, Request, Response, StatusCode};
|
||||
use indexmap::IndexMap;
|
||||
use postgres_client::error::{DbError, ErrorPosition, SqlState};
|
||||
use postgres_client::{GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, Transaction};
|
||||
use pq_proto::StartupMessageParamsBuilder;
|
||||
use serde::Serialize;
|
||||
use serde_json::value::RawValue;
|
||||
use serde_json::Value;
|
||||
use tokio::time::{self, Instant};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -249,6 +251,50 @@ pub(crate) async fn handle(
|
||||
let mut response = match result {
|
||||
Ok(r) => {
|
||||
ctx.set_success();
|
||||
|
||||
// Handling the error response from local proxy here
|
||||
if config.authentication_config.is_auth_broker && r.status().is_server_error() {
|
||||
let status = r.status();
|
||||
|
||||
let body_bytes = r
|
||||
.collect()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
ApiError::InternalServerError(anyhow::Error::msg(format!(
|
||||
"could not collect http body: {e}"
|
||||
)))
|
||||
})?
|
||||
.to_bytes();
|
||||
|
||||
if let Ok(mut json_map) =
|
||||
serde_json::from_slice::<IndexMap<&str, &RawValue>>(&body_bytes)
|
||||
{
|
||||
let message = json_map.get("message");
|
||||
if let Some(message) = message {
|
||||
let msg: String = match serde_json::from_str(message.get()) {
|
||||
Ok(msg) => msg,
|
||||
Err(_) => {
|
||||
"Unable to parse the response message from server".to_string()
|
||||
}
|
||||
};
|
||||
|
||||
error!("Error response from local_proxy: {status} {msg}");
|
||||
|
||||
json_map.retain(|key, _| !key.starts_with("neon:")); // remove all the neon-related keys
|
||||
|
||||
let resp_json = serde_json::to_string(&json_map)
|
||||
.unwrap_or("failed to serialize the response message".to_string());
|
||||
|
||||
return json_response(status, resp_json);
|
||||
}
|
||||
}
|
||||
|
||||
error!("Unable to parse the response message from local_proxy");
|
||||
return json_response(
|
||||
status,
|
||||
json!({ "message": "Unable to parse the response message from server".to_string() }),
|
||||
);
|
||||
}
|
||||
r
|
||||
}
|
||||
Err(e @ SqlOverHttpError::Cancelled(_)) => {
|
||||
@@ -618,8 +664,6 @@ async fn handle_db_inner(
|
||||
|
||||
let authenticate_and_connect = Box::pin(
|
||||
async {
|
||||
let is_local_proxy = matches!(backend.auth_backend, crate::auth::Backend::Local(_));
|
||||
|
||||
let keys = match auth {
|
||||
AuthData::Password(pw) => {
|
||||
backend
|
||||
@@ -634,7 +678,9 @@ async fn handle_db_inner(
|
||||
};
|
||||
|
||||
let client = match keys.keys {
|
||||
ComputeCredentialKeys::JwtPayload(payload) if is_local_proxy => {
|
||||
ComputeCredentialKeys::JwtPayload(payload)
|
||||
if backend.auth_backend.is_local_proxy() =>
|
||||
{
|
||||
let mut client = backend.connect_to_local_postgres(ctx, conn_info).await?;
|
||||
let (cli_inner, _dsc) = client.client_inner();
|
||||
cli_inner.set_jwt_session(&payload).await?;
|
||||
|
||||
@@ -592,6 +592,8 @@ impl Timeline {
|
||||
assert!(self.cancel.is_cancelled());
|
||||
assert!(self.gate.close_complete());
|
||||
|
||||
info!("deleting timeline {} from disk", self.ttid);
|
||||
|
||||
// Close associated FDs. Nobody will be able to touch timeline data once
|
||||
// it is cancelled, so WAL storage won't be opened again.
|
||||
shared_state.sk.close_wal_store();
|
||||
|
||||
@@ -475,6 +475,8 @@ impl GlobalTimelines {
|
||||
info!("deleting timeline {}, only_local={}", ttid, only_local);
|
||||
timeline.shutdown().await;
|
||||
|
||||
info!("timeline {ttid} shut down for deletion");
|
||||
|
||||
// Take a lock and finish the deletion holding this mutex.
|
||||
let mut shared_state = timeline.write_shared_state().await;
|
||||
|
||||
|
||||
@@ -3345,7 +3345,7 @@ class NeonProxy(PgProtocol):
|
||||
metric_collection_interval: str | None = None,
|
||||
):
|
||||
host = "127.0.0.1"
|
||||
domain = "proxy.localtest.me" # resolves to 127.0.0.1
|
||||
domain = "proxy.local.neon.build" # resolves to 127.0.0.1
|
||||
super().__init__(dsn=auth_backend.default_conn_url, host=domain, port=proxy_port)
|
||||
|
||||
self.domain = domain
|
||||
@@ -3368,7 +3368,7 @@ class NeonProxy(PgProtocol):
|
||||
# generate key of it doesn't exist
|
||||
crt_path = self.test_output_dir / "proxy.crt"
|
||||
key_path = self.test_output_dir / "proxy.key"
|
||||
generate_proxy_tls_certs("*.localtest.me", key_path, crt_path)
|
||||
generate_proxy_tls_certs("*.local.neon.build", key_path, crt_path)
|
||||
|
||||
args = [
|
||||
str(self.neon_binpath / "proxy"),
|
||||
@@ -3569,7 +3569,7 @@ class NeonAuthBroker:
|
||||
external_http_port: int,
|
||||
auth_backend: NeonAuthBroker.ProxyV1,
|
||||
):
|
||||
self.domain = "apiauth.localtest.me" # resolves to 127.0.0.1
|
||||
self.domain = "apiauth.local.neon.build" # resolves to 127.0.0.1
|
||||
self.host = "127.0.0.1"
|
||||
self.http_port = http_port
|
||||
self.external_http_port = external_http_port
|
||||
@@ -3586,7 +3586,7 @@ class NeonAuthBroker:
|
||||
# generate key of it doesn't exist
|
||||
crt_path = self.test_output_dir / "proxy.crt"
|
||||
key_path = self.test_output_dir / "proxy.key"
|
||||
generate_proxy_tls_certs("apiauth.localtest.me", key_path, crt_path)
|
||||
generate_proxy_tls_certs("apiauth.local.neon.build", key_path, crt_path)
|
||||
|
||||
args = [
|
||||
str(self.neon_binpath / "proxy"),
|
||||
@@ -5122,12 +5122,14 @@ def wait_for_last_flush_lsn(
|
||||
timeline: TimelineId,
|
||||
pageserver_id: int | None = None,
|
||||
auth_token: str | None = None,
|
||||
last_flush_lsn: Lsn | None = None,
|
||||
) -> Lsn:
|
||||
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
|
||||
|
||||
shards = tenant_get_shards(env, tenant, pageserver_id)
|
||||
|
||||
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
if last_flush_lsn is None:
|
||||
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
|
||||
results = []
|
||||
for tenant_shard_id, pageserver in shards:
|
||||
|
||||
@@ -282,18 +282,35 @@ class S3Storage:
|
||||
def timeline_path(self, tenant_id: TenantShardId | TenantId, timeline_id: TimelineId) -> str:
|
||||
return f"{self.tenant_path(tenant_id)}/timelines/{timeline_id}"
|
||||
|
||||
def get_latest_generation_key(self, prefix: str, suffix: str, keys: list[str]) -> str:
|
||||
"""
|
||||
Gets the latest generation key from a list of keys.
|
||||
|
||||
@param index_keys: A list of keys of different generations, which start with `prefix`
|
||||
"""
|
||||
|
||||
def parse_gen(key: str) -> int:
|
||||
shortname = key.split("/")[-1]
|
||||
generation_str = shortname.removeprefix(prefix).removesuffix(suffix)
|
||||
try:
|
||||
return int(generation_str, base=16)
|
||||
except ValueError:
|
||||
log.info(f"Ignoring non-matching key: {key}")
|
||||
return -1
|
||||
|
||||
if len(keys) == 0:
|
||||
raise IndexError("No keys found")
|
||||
|
||||
return max(keys, key=parse_gen)
|
||||
|
||||
def get_latest_index_key(self, index_keys: list[str]) -> str:
|
||||
"""
|
||||
Gets the latest index file key.
|
||||
|
||||
@param index_keys: A list of index keys of different generations.
|
||||
"""
|
||||
|
||||
def parse_gen(index_key: str) -> int:
|
||||
parts = index_key.split("index_part.json-")
|
||||
return int(parts[-1], base=16) if len(parts) == 2 else -1
|
||||
|
||||
return max(index_keys, key=parse_gen)
|
||||
key = self.get_latest_generation_key(prefix="index_part.json-", suffix="", keys=index_keys)
|
||||
return key
|
||||
|
||||
def download_index_part(self, index_key: str) -> IndexPartDump:
|
||||
"""
|
||||
@@ -306,6 +323,29 @@ class S3Storage:
|
||||
log.info(f"index_part.json: {body}")
|
||||
return IndexPartDump.from_json(json.loads(body))
|
||||
|
||||
def download_tenant_manifest(self, tenant_id: TenantId) -> dict[str, Any] | None:
|
||||
tenant_prefix = self.tenant_path(tenant_id)
|
||||
|
||||
objects = self.client.list_objects_v2(Bucket=self.bucket_name, Prefix=f"{tenant_prefix}/")[
|
||||
"Contents"
|
||||
]
|
||||
keys = [obj["Key"] for obj in objects if obj["Key"].find("tenant-manifest") != -1]
|
||||
try:
|
||||
manifest_key = self.get_latest_generation_key("tenant-manifest-", ".json", keys)
|
||||
except IndexError:
|
||||
log.info(
|
||||
f"No manifest found for tenant {tenant_id}, this is normal if it didn't offload anything yet"
|
||||
)
|
||||
return None
|
||||
|
||||
response = self.client.get_object(Bucket=self.bucket_name, Key=manifest_key)
|
||||
body = response["Body"].read().decode("utf-8")
|
||||
log.info(f"Downloaded manifest {manifest_key}: {body}")
|
||||
|
||||
manifest = json.loads(body)
|
||||
assert isinstance(manifest, dict)
|
||||
return manifest
|
||||
|
||||
def heatmap_key(self, tenant_id: TenantId) -> str:
|
||||
return f"{self.tenant_path(tenant_id)}/{TENANT_HEATMAP_FILE_NAME}"
|
||||
|
||||
|
||||
@@ -76,6 +76,9 @@ def test_ingest_logical_message(
|
||||
log.info("Waiting for Pageserver to catch up")
|
||||
wait_for_last_record_lsn(client, env.initial_tenant, env.initial_timeline, end_lsn)
|
||||
|
||||
recover_to_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
|
||||
endpoint.stop()
|
||||
|
||||
# Now that all data is ingested, delete and recreate the tenant in the pageserver. This will
|
||||
# reingest all the WAL from the safekeeper without any other constraints. This gives us a
|
||||
# baseline of how fast the pageserver can ingest this WAL in isolation.
|
||||
@@ -88,7 +91,13 @@ def test_ingest_logical_message(
|
||||
with zenbenchmark.record_duration("pageserver_recover_ingest"):
|
||||
log.info("Recovering WAL into pageserver")
|
||||
client.timeline_create(env.pg_version, env.initial_tenant, env.initial_timeline)
|
||||
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
|
||||
wait_for_last_flush_lsn(
|
||||
env, endpoint, env.initial_tenant, env.initial_timeline, last_flush_lsn=recover_to_lsn
|
||||
)
|
||||
|
||||
# Check endpoint can start, i.e. we really recovered
|
||||
endpoint.start()
|
||||
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
# Emit metrics.
|
||||
wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024))
|
||||
|
||||
@@ -474,6 +474,14 @@ HISTORIC_DATA_SETS = [
|
||||
PgVersion.V16,
|
||||
"https://neon-github-public-dev.s3.eu-central-1.amazonaws.com/compatibility-data-snapshots/2024-07-18-pgv16.tar.zst",
|
||||
),
|
||||
# This dataset created on a pageserver running modern code at time of capture, but configured with no generation. This
|
||||
# is our regression test that we can load data written without generations in layer file names & indices
|
||||
HistoricDataSet(
|
||||
"2025-02-07-nogenerations",
|
||||
TenantId("e1411ca6562d6ff62419f693a5695d67"),
|
||||
PgVersion.V17,
|
||||
"https://neon-github-public-dev.s3.eu-central-1.amazonaws.com/compatibility-data-snapshots/2025-02-07-pgv17-nogenerations.tar.zst",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
|
||||
@@ -82,7 +82,7 @@ def test_compute_catalog(neon_simple_env: NeonEnv):
|
||||
ddl = client.database_schema(database=test_db["name"])
|
||||
|
||||
# Check that it looks like a valid PostgreSQL dump
|
||||
assert "-- PostgreSQL database dump" in ddl
|
||||
assert "-- PostgreSQL database dump complete" in ddl
|
||||
|
||||
# Check that it doesn't contain health_check and migration traces.
|
||||
# They are only created in system `postgres` database, so by checking
|
||||
|
||||
@@ -20,6 +20,9 @@ from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
|
||||
from fixtures.utils import query_scalar, wait_until
|
||||
|
||||
|
||||
@pytest.mark.skip(
|
||||
reason="We won't create future layers any more after https://github.com/neondatabase/neon/pull/10548"
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
"attach_mode",
|
||||
["default_generation", "same_generation"],
|
||||
|
||||
@@ -12,7 +12,6 @@ of the pageserver are:
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
from enum import StrEnum
|
||||
|
||||
@@ -29,7 +28,6 @@ from fixtures.pageserver.common_types import parse_layer_file_name
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import (
|
||||
assert_tenant_state,
|
||||
list_prefix,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
)
|
||||
@@ -124,109 +122,6 @@ def assert_deletion_queue(ps_http, size_fn) -> None:
|
||||
assert size_fn(v) is True
|
||||
|
||||
|
||||
def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Validate behavior when a pageserver is run without generation support enabled,
|
||||
then started again after activating it:
|
||||
- Before upgrade, no objects should have generation suffixes
|
||||
- After upgrade, the bucket should contain a mixture.
|
||||
- In both cases, postgres I/O should work.
|
||||
"""
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.broker.start()
|
||||
for sk in env.safekeepers:
|
||||
sk.start()
|
||||
env.storage_controller.start()
|
||||
|
||||
# We will start a pageserver with no control_plane_api set, so it won't be able to self-register
|
||||
env.storage_controller.node_register(env.pageserver)
|
||||
|
||||
def remove_control_plane_api_field(config):
|
||||
return config.pop("control_plane_api")
|
||||
|
||||
control_plane_api = env.pageserver.edit_config_toml(remove_control_plane_api_field)
|
||||
env.pageserver.start()
|
||||
env.storage_controller.node_configure(env.pageserver.id, {"availability": "Active"})
|
||||
|
||||
env.create_tenant(
|
||||
tenant_id=env.initial_tenant, conf=TENANT_CONF, timeline_id=env.initial_timeline
|
||||
)
|
||||
|
||||
generate_uploads_and_deletions(env, pageserver=env.pageserver)
|
||||
|
||||
def parse_generation_suffix(key):
|
||||
m = re.match(".+-([0-9a-zA-Z]{8})$", key)
|
||||
if m is None:
|
||||
return None
|
||||
else:
|
||||
log.info(f"match: {m}")
|
||||
log.info(f"group: {m.group(1)}")
|
||||
return int(m.group(1), 16)
|
||||
|
||||
assert neon_env_builder.pageserver_remote_storage is not None
|
||||
pre_upgrade_keys = list(
|
||||
[
|
||||
o["Key"]
|
||||
for o in list_prefix(neon_env_builder.pageserver_remote_storage, delimiter="")[
|
||||
"Contents"
|
||||
]
|
||||
]
|
||||
)
|
||||
for key in pre_upgrade_keys:
|
||||
assert parse_generation_suffix(key) is None
|
||||
|
||||
env.pageserver.stop()
|
||||
# Starting without the override that disabled control_plane_api
|
||||
env.pageserver.patch_config_toml_nonrecursive(
|
||||
{
|
||||
"control_plane_api": control_plane_api,
|
||||
}
|
||||
)
|
||||
env.pageserver.start()
|
||||
|
||||
generate_uploads_and_deletions(env, pageserver=env.pageserver, init=False)
|
||||
|
||||
legacy_objects: list[str] = []
|
||||
suffixed_objects = []
|
||||
post_upgrade_keys = list(
|
||||
[
|
||||
o["Key"]
|
||||
for o in list_prefix(neon_env_builder.pageserver_remote_storage, delimiter="")[
|
||||
"Contents"
|
||||
]
|
||||
]
|
||||
)
|
||||
for key in post_upgrade_keys:
|
||||
log.info(f"post-upgrade key: {key}")
|
||||
if parse_generation_suffix(key) is not None:
|
||||
suffixed_objects.append(key)
|
||||
else:
|
||||
legacy_objects.append(key)
|
||||
|
||||
# Bucket now contains a mixture of suffixed and non-suffixed objects
|
||||
assert len(suffixed_objects) > 0
|
||||
assert len(legacy_objects) > 0
|
||||
|
||||
# Flush through deletions to get a clean state for scrub: we are implicitly validating
|
||||
# that our generations-enabled pageserver was able to do deletions of layers
|
||||
# from earlier which don't have a generation.
|
||||
env.pageserver.http_client().deletion_queue_flush(execute=True)
|
||||
|
||||
assert get_deletion_queue_unexpected_errors(env.pageserver.http_client()) == 0
|
||||
|
||||
# Having written a mixture of generation-aware and legacy index_part.json,
|
||||
# ensure the scrubber handles the situation as expected.
|
||||
healthy, metadata_summary = env.storage_scrubber.scan_metadata()
|
||||
assert metadata_summary["tenant_count"] == 1 # Scrubber should have seen our timeline
|
||||
assert metadata_summary["timeline_count"] == 1
|
||||
assert metadata_summary["timeline_shard_count"] == 1
|
||||
assert healthy
|
||||
|
||||
|
||||
def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
|
||||
@@ -57,7 +57,7 @@ def test_proxy_select_1(static_proxy: NeonProxy):
|
||||
assert out[0][0] == 1
|
||||
|
||||
# with SNI
|
||||
out = static_proxy.safe_psql("select 42", host="generic-project-name.localtest.me")
|
||||
out = static_proxy.safe_psql("select 42", host="generic-project-name.local.neon.build")
|
||||
assert out[0][0] == 42
|
||||
|
||||
|
||||
@@ -234,7 +234,7 @@ def test_sql_over_http_serverless_driver(static_proxy: NeonProxy):
|
||||
|
||||
connstr = f"postgresql://http:http@{static_proxy.domain}:{static_proxy.proxy_port}/postgres"
|
||||
response = requests.post(
|
||||
f"https://api.localtest.me:{static_proxy.external_http_port}/sql",
|
||||
f"https://api.local.neon.build:{static_proxy.external_http_port}/sql",
|
||||
data=json.dumps({"query": "select 42 as answer", "params": []}),
|
||||
headers={"Content-Type": "application/sql", "Neon-Connection-String": connstr},
|
||||
verify=str(static_proxy.test_output_dir / "proxy.crt"),
|
||||
|
||||
@@ -35,7 +35,7 @@ async def test_proxy_psql_allowed_ips(static_proxy: NeonProxy, vanilla_pg: Vanil
|
||||
check_cannot_connect(query="select 1", sslsni=0, options="endpoint=private-project")
|
||||
|
||||
# with SNI
|
||||
check_cannot_connect(query="select 1", host="private-project.localtest.me")
|
||||
check_cannot_connect(query="select 1", host="private-project.local.neon.build")
|
||||
|
||||
# no SNI, deprecated `options=project` syntax (before we had several endpoint in project)
|
||||
out = static_proxy.safe_psql(query="select 1", sslsni=0, options="project=generic-project")
|
||||
@@ -46,7 +46,7 @@ async def test_proxy_psql_allowed_ips(static_proxy: NeonProxy, vanilla_pg: Vanil
|
||||
assert out[0][0] == 1
|
||||
|
||||
# with SNI
|
||||
out = static_proxy.safe_psql(query="select 1", host="generic-project.localtest.me")
|
||||
out = static_proxy.safe_psql(query="select 1", host="generic-project.local.neon.build")
|
||||
assert out[0][0] == 1
|
||||
|
||||
|
||||
|
||||
@@ -116,7 +116,7 @@ def test_pg_sni_router(
|
||||
test_output_dir: Path,
|
||||
):
|
||||
generate_tls_cert(
|
||||
"endpoint.namespace.localtest.me",
|
||||
"endpoint.namespace.local.neon.build",
|
||||
test_output_dir / "router.crt",
|
||||
test_output_dir / "router.key",
|
||||
)
|
||||
@@ -130,7 +130,7 @@ def test_pg_sni_router(
|
||||
with PgSniRouter(
|
||||
neon_binpath=neon_binpath,
|
||||
port=router_port,
|
||||
destination="localtest.me",
|
||||
destination="local.neon.build",
|
||||
tls_cert=test_output_dir / "router.crt",
|
||||
tls_key=test_output_dir / "router.key",
|
||||
test_output_dir=test_output_dir,
|
||||
@@ -141,7 +141,7 @@ def test_pg_sni_router(
|
||||
"select 1",
|
||||
dbname="postgres",
|
||||
sslmode="require",
|
||||
host=f"endpoint--namespace--{pg_port}.localtest.me",
|
||||
host=f"endpoint--namespace--{pg_port}.local.neon.build",
|
||||
hostaddr="127.0.0.1",
|
||||
)
|
||||
assert out[0][0] == 1
|
||||
|
||||
@@ -554,8 +554,33 @@ def test_timeline_archival_chaos(neon_env_builder: NeonEnvBuilder):
|
||||
log.info(f"Timeline {state.timeline_id} is still active")
|
||||
shutdown.wait(0.5)
|
||||
elif state.timeline_id in offloaded_ids:
|
||||
log.info(f"Timeline {state.timeline_id} is now offloaded")
|
||||
state.offloaded = True
|
||||
log.info(f"Timeline {state.timeline_id} is now offloaded in memory")
|
||||
|
||||
# Hack: when we see something offloaded in the API, it doesn't guarantee that the offload
|
||||
# is persistent (it is marked offloaded first, then that is persisted to the tenant manifest).
|
||||
# So we wait until we see the manifest update before considering it offloaded, that way
|
||||
# subsequent checks that it doesn't revert to active on a restart will pass reliably.
|
||||
time.sleep(0.1)
|
||||
assert isinstance(env.pageserver_remote_storage, S3Storage)
|
||||
manifest = env.pageserver_remote_storage.download_tenant_manifest(
|
||||
tenant_id
|
||||
)
|
||||
if manifest is None:
|
||||
log.info(
|
||||
f"Timeline {state.timeline_id} is not yet offloaded persistently (no manifest)"
|
||||
)
|
||||
elif str(state.timeline_id) in [
|
||||
t["timeline_id"] for t in manifest["offloaded_timelines"]
|
||||
]:
|
||||
log.info(
|
||||
f"Timeline {state.timeline_id} is now offloaded persistently"
|
||||
)
|
||||
state.offloaded = True
|
||||
else:
|
||||
log.info(
|
||||
f"Timeline {state.timeline_id} is not yet offloaded persistently (manifest: {manifest})"
|
||||
)
|
||||
|
||||
break
|
||||
else:
|
||||
# Timeline is neither offloaded nor active, this is unexpected: the pageserver
|
||||
|
||||
@@ -13,12 +13,12 @@
|
||||
# postgres -D data -p3000
|
||||
#
|
||||
# ## Launch proxy with WSS enabled:
|
||||
# openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key -subj '/CN=*.neon.localtest.me'
|
||||
# openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key -subj '/CN=*.local.neon.build'
|
||||
# ./target/debug/proxy --wss 127.0.0.1:40433 --http 127.0.0.1:28080 --mgmt 127.0.0.1:9099 --proxy 127.0.0.1:4433 --tls-key server.key --tls-cert server.crt --auth-backend postgres
|
||||
#
|
||||
# ## Launch the tunnel:
|
||||
#
|
||||
# poetry run ./test_runner/websocket_tunnel.py --ws-port 40433 --ws-url "wss://ep-test.neon.localtest.me"
|
||||
# poetry run ./test_runner/websocket_tunnel.py --ws-port 40433 --ws-url "wss://ep-test.local.neon.build"
|
||||
#
|
||||
# ## Now you can connect with psql:
|
||||
# psql "postgresql://heikki@localhost:40433/postgres"
|
||||
|
||||
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 86d9ea96eb...13cf5d06c9
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: 8dfd5a7030...4c45d78ad5
4
vendor/revisions.json
vendored
4
vendor/revisions.json
vendored
@@ -1,11 +1,11 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.2",
|
||||
"8dfd5a7030d3e8a98b60265ebe045788892ac7f3"
|
||||
"4c45d78ad587e4bcb4a5a7ef6931b88c6a3d575d"
|
||||
],
|
||||
"v16": [
|
||||
"16.6",
|
||||
"86d9ea96ebb9088eac62f57f1f5ace68e70e0d1c"
|
||||
"13cf5d06c98a8e9b0590ce6cdfd193a08d0a7792"
|
||||
],
|
||||
"v15": [
|
||||
"15.10",
|
||||
|
||||
Reference in New Issue
Block a user