mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-12 15:10:38 +00:00
Compare commits
56 Commits
diko/metri
...
luist18/pg
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
eb030fa5a5 | ||
|
|
18f1e152ba | ||
|
|
1f14a303d8 | ||
|
|
c051d33d11 | ||
|
|
d9aa901c6d | ||
|
|
8a2b19f467 | ||
|
|
486872dd28 | ||
|
|
d37e90f430 | ||
|
|
8eb701d706 | ||
|
|
85a515c176 | ||
|
|
aa88279681 | ||
|
|
b2a670c765 | ||
|
|
ad9655bb01 | ||
|
|
1a87975d95 | ||
|
|
417b2781d9 | ||
|
|
2841f1ffa5 | ||
|
|
aad410c8f1 | ||
|
|
4f94751b75 | ||
|
|
6ee84d985a | ||
|
|
295be03a33 | ||
|
|
8e1b5a9727 | ||
|
|
1ef4258f29 | ||
|
|
65e2aae6e4 | ||
|
|
edc874e1b3 | ||
|
|
181af302b5 | ||
|
|
497116b76d | ||
|
|
a917952b30 | ||
|
|
e581b670f4 | ||
|
|
8ed79ed773 | ||
|
|
381f42519e | ||
|
|
375df517a0 | ||
|
|
9db63fea7a | ||
|
|
bfc767d60d | ||
|
|
109c54a300 | ||
|
|
74920d8cd8 | ||
|
|
131b32ef48 | ||
|
|
581bb5d7d5 | ||
|
|
3c78133477 | ||
|
|
46e046e779 | ||
|
|
d8cee52637 | ||
|
|
2e11d129d0 | ||
|
|
43a7423f72 | ||
|
|
374736a4de | ||
|
|
5e507776bc | ||
|
|
4e8e0951be | ||
|
|
64a8d0c2e6 | ||
|
|
7602e6ffc0 | ||
|
|
17193d6a33 | ||
|
|
03ae57236f | ||
|
|
e3d27b2f68 | ||
|
|
dd1299f337 | ||
|
|
cb19e4e05d | ||
|
|
9df230c837 | ||
|
|
3c2bc5baba | ||
|
|
6667810800 | ||
|
|
47f5bcf2bc |
12
.github/scripts/generate_image_maps.py
vendored
12
.github/scripts/generate_image_maps.py
vendored
@@ -39,12 +39,18 @@ registries = {
|
||||
],
|
||||
}
|
||||
|
||||
release_branches = ["release", "release-proxy", "release-compute"]
|
||||
|
||||
outputs: dict[str, dict[str, list[str]]] = {}
|
||||
|
||||
target_tags = [target_tag, "latest"] if branch == "main" else [target_tag]
|
||||
target_stages = (
|
||||
["dev", "prod"] if branch in ["release", "release-proxy", "release-compute"] else ["dev"]
|
||||
target_tags = (
|
||||
[target_tag, "latest"]
|
||||
if branch == "main"
|
||||
else [target_tag, "released"]
|
||||
if branch in release_branches
|
||||
else [target_tag]
|
||||
)
|
||||
target_stages = ["dev", "prod"] if branch in release_branches else ["dev"]
|
||||
|
||||
for component_name, component_images in components.items():
|
||||
for stage in target_stages:
|
||||
|
||||
39
.github/scripts/push_with_image_map.py
vendored
39
.github/scripts/push_with_image_map.py
vendored
@@ -2,6 +2,9 @@ import json
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
RED = "\033[91m"
|
||||
RESET = "\033[0m"
|
||||
|
||||
image_map = os.getenv("IMAGE_MAP")
|
||||
if not image_map:
|
||||
raise ValueError("IMAGE_MAP environment variable is not set")
|
||||
@@ -11,12 +14,32 @@ try:
|
||||
except json.JSONDecodeError as e:
|
||||
raise ValueError("Failed to parse IMAGE_MAP as JSON") from e
|
||||
|
||||
for source, targets in parsed_image_map.items():
|
||||
for target in targets:
|
||||
cmd = ["docker", "buildx", "imagetools", "create", "-t", target, source]
|
||||
print(f"Running: {' '.join(cmd)}")
|
||||
result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
failures = []
|
||||
|
||||
if result.returncode != 0:
|
||||
print(f"Error: {result.stdout}")
|
||||
raise RuntimeError(f"Command failed: {' '.join(cmd)}")
|
||||
pending = [(source, target) for source, targets in parsed_image_map.items() for target in targets]
|
||||
|
||||
while len(pending) > 0:
|
||||
if len(failures) > 10:
|
||||
print("Error: more than 10 failures!")
|
||||
for failure in failures:
|
||||
print(f'"{failure[0]}" failed with the following output:')
|
||||
print(failure[1])
|
||||
raise RuntimeError("Retry limit reached.")
|
||||
|
||||
source, target = pending.pop(0)
|
||||
cmd = ["docker", "buildx", "imagetools", "create", "-t", target, source]
|
||||
print(f"Running: {' '.join(cmd)}")
|
||||
result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
|
||||
if result.returncode != 0:
|
||||
failures.append((" ".join(cmd), result.stdout, target))
|
||||
pending.append((source, target))
|
||||
print(
|
||||
f"{RED}[RETRY]{RESET} Push failed for {target}. Retrying... (failure count: {len(failures)})"
|
||||
)
|
||||
print(result.stdout)
|
||||
|
||||
if len(failures) > 0 and (github_output := os.getenv("GITHUB_OUTPUT")):
|
||||
failed_targets = [target for _, _, target in failures]
|
||||
with open(github_output, "a") as f:
|
||||
f.write(f"push_failures={json.dumps(failed_targets)}\n")
|
||||
|
||||
@@ -104,6 +104,25 @@ jobs:
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
|
||||
- name: Copy docker images to target registries
|
||||
id: push
|
||||
run: python3 .github/scripts/push_with_image_map.py
|
||||
env:
|
||||
IMAGE_MAP: ${{ inputs.image-map }}
|
||||
|
||||
- name: Notify Slack if container image pushing fails
|
||||
if: steps.push.outputs.push_failures || failure()
|
||||
uses: slackapi/slack-github-action@485a9d42d3a73031f12ec201c457e2162c45d02d # v2.0.0
|
||||
with:
|
||||
method: chat.postMessage
|
||||
token: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
payload: |
|
||||
channel: ${{ vars.SLACK_ON_CALL_DEVPROD_STREAM }}
|
||||
text: >
|
||||
*Container image pushing ${{
|
||||
steps.push.outcome == 'failure' && 'failed completely' || 'succeeded with some retries'
|
||||
}}* in
|
||||
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
|
||||
|
||||
${{ steps.push.outputs.push_failures && format(
|
||||
'*Failed targets:*\n• {0}', join(fromJson(steps.push.outputs.push_failures), '\n• ')
|
||||
) || '' }}
|
||||
|
||||
27
.github/workflows/build_and_test.yml
vendored
27
.github/workflows/build_and_test.yml
vendored
@@ -89,8 +89,8 @@ jobs:
|
||||
|
||||
check-codestyle-python:
|
||||
needs: [ meta, check-permissions, build-build-tools-image ]
|
||||
# No need to run on `main` because we this in the merge queue
|
||||
if: ${{ needs.meta.outputs.run-kind == 'pr' }}
|
||||
# No need to run on `main` because we this in the merge queue. We do need to run this in `.*-rc-pr` because of hotfixes.
|
||||
if: ${{ contains(fromJSON('["pr", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
|
||||
uses: ./.github/workflows/_check-codestyle-python.yml
|
||||
with:
|
||||
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
@@ -98,7 +98,8 @@ jobs:
|
||||
|
||||
check-codestyle-jsonnet:
|
||||
needs: [ meta, check-permissions, build-build-tools-image ]
|
||||
if: ${{ contains(fromJSON('["pr", "push-main"]'), needs.meta.outputs.run-kind) }}
|
||||
# We do need to run this in `.*-rc-pr` because of hotfixes.
|
||||
if: ${{ contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
|
||||
runs-on: [ self-hosted, small ]
|
||||
container:
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}
|
||||
@@ -181,8 +182,8 @@ jobs:
|
||||
|
||||
check-codestyle-rust:
|
||||
needs: [ meta, check-permissions, build-build-tools-image ]
|
||||
# No need to run on `main` because we this in the merge queue
|
||||
if: ${{ needs.meta.outputs.run-kind == 'pr' }}
|
||||
# No need to run on `main` because we this in the merge queue. We do need to run this in `.*-rc-pr` because of hotfixes.
|
||||
if: ${{ contains(fromJSON('["pr", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
|
||||
uses: ./.github/workflows/_check-codestyle-rust.yml
|
||||
with:
|
||||
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
@@ -191,7 +192,8 @@ jobs:
|
||||
|
||||
check-dependencies-rust:
|
||||
needs: [ meta, files-changed, build-build-tools-image ]
|
||||
if: ${{ needs.files-changed.outputs.check-rust-dependencies == 'true' && needs.meta.outputs.run-kind == 'pr' }}
|
||||
# No need to run on `main` because we this in the merge queue. We do need to run this in `.*-rc-pr` because of hotfixes.
|
||||
if: ${{ needs.files-changed.outputs.check-rust-dependencies == 'true' && contains(fromJSON('["pr", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
|
||||
uses: ./.github/workflows/cargo-deny.yml
|
||||
with:
|
||||
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
@@ -199,7 +201,8 @@ jobs:
|
||||
|
||||
build-and-test-locally:
|
||||
needs: [ meta, build-build-tools-image ]
|
||||
if: ${{ contains(fromJSON('["pr", "push-main"]'), needs.meta.outputs.run-kind) }}
|
||||
# We do need to run this in `.*-rc-pr` because of hotfixes.
|
||||
if: ${{ contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
@@ -977,7 +980,7 @@ jobs:
|
||||
TEST_EXTENSIONS_TAG: >-
|
||||
${{
|
||||
contains(fromJSON('["storage-rc-pr", "proxy-rc-pr"]'), needs.meta.outputs.run-kind)
|
||||
&& 'latest'
|
||||
&& needs.meta.outputs.previous-compute-release
|
||||
|| needs.meta.outputs.build-tag
|
||||
}}
|
||||
TEST_VERSION_ONLY: ${{ matrix.pg_version }}
|
||||
@@ -1565,10 +1568,10 @@ jobs:
|
||||
if: |
|
||||
contains(needs.*.result, 'failure')
|
||||
|| contains(needs.*.result, 'cancelled')
|
||||
|| (needs.check-dependencies-rust.result == 'skipped' && needs.files-changed.outputs.check-rust-dependencies == 'true' && needs.meta.outputs.run-kind == 'pr')
|
||||
|| (needs.build-and-test-locally.result == 'skipped' && needs.meta.outputs.run-kind == 'pr')
|
||||
|| (needs.check-codestyle-python.result == 'skipped' && needs.meta.outputs.run-kind == 'pr')
|
||||
|| (needs.check-codestyle-rust.result == 'skipped' && needs.meta.outputs.run-kind == 'pr')
|
||||
|| (needs.check-dependencies-rust.result == 'skipped' && needs.files-changed.outputs.check-rust-dependencies == 'true' && contains(fromJSON('["pr", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|
||||
|| (needs.build-and-test-locally.result == 'skipped' && contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|
||||
|| (needs.check-codestyle-python.result == 'skipped' && contains(fromJSON('["pr", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|
||||
|| (needs.check-codestyle-rust.result == 'skipped' && contains(fromJSON('["pr", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|
||||
|| needs.files-changed.result == 'skipped'
|
||||
|| (needs.push-compute-image-dev.result == 'skipped' && contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|
||||
|| (needs.push-neon-image-dev.result == 'skipped' && contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind))
|
||||
|
||||
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -4329,6 +4329,7 @@ dependencies = [
|
||||
"strum",
|
||||
"strum_macros",
|
||||
"thiserror 1.0.69",
|
||||
"tracing-utils",
|
||||
"utils",
|
||||
]
|
||||
|
||||
@@ -7115,9 +7116,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.43.0"
|
||||
version = "1.43.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e"
|
||||
checksum = "492a604e2fd7f814268a378409e6c92b5525d747d10db9a229723f55a417958c"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"bytes",
|
||||
@@ -7603,6 +7604,7 @@ dependencies = [
|
||||
"opentelemetry-otlp",
|
||||
"opentelemetry-semantic-conventions",
|
||||
"opentelemetry_sdk",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-opentelemetry",
|
||||
|
||||
@@ -183,7 +183,7 @@ test-context = "0.3"
|
||||
thiserror = "1.0"
|
||||
tikv-jemallocator = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] }
|
||||
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] }
|
||||
tokio = { version = "1.41", features = ["macros"] }
|
||||
tokio = { version = "1.43.1", features = ["macros"] }
|
||||
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
|
||||
tokio-io-timeout = "1.2.0"
|
||||
tokio-postgres-rustls = "0.12.0"
|
||||
|
||||
@@ -292,7 +292,7 @@ WORKDIR /home/nonroot
|
||||
|
||||
# Rust
|
||||
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
|
||||
ENV RUSTC_VERSION=1.85.0
|
||||
ENV RUSTC_VERSION=1.86.0
|
||||
ENV RUSTUP_HOME="/home/nonroot/.rustup"
|
||||
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
|
||||
ARG RUSTFILT_VERSION=0.2.1
|
||||
|
||||
@@ -369,7 +369,7 @@ FROM build-deps AS plv8-src
|
||||
ARG PG_VERSION
|
||||
WORKDIR /ext-src
|
||||
|
||||
COPY compute/patches/plv8-3.1.10.patch .
|
||||
COPY compute/patches/plv8* .
|
||||
|
||||
# plv8 3.2.3 supports v17
|
||||
# last release v3.2.3 - Sep 7, 2024
|
||||
@@ -393,7 +393,7 @@ RUN case "${PG_VERSION:?}" in \
|
||||
git clone --recurse-submodules --depth 1 --branch ${PLV8_TAG} https://github.com/plv8/plv8.git plv8-src && \
|
||||
tar -czf plv8.tar.gz --exclude .git plv8-src && \
|
||||
cd plv8-src && \
|
||||
if [[ "${PG_VERSION:?}" < "v17" ]]; then patch -p1 < /ext-src/plv8-3.1.10.patch; fi
|
||||
if [[ "${PG_VERSION:?}" < "v17" ]]; then patch -p1 < /ext-src/plv8_v3.1.10.patch; else patch -p1 < /ext-src/plv8_v3.2.3.patch; fi
|
||||
|
||||
# Step 1: Build the vendored V8 engine. It doesn't depend on PostgreSQL, so use
|
||||
# 'build-deps' as the base. This enables caching and avoids unnecessary rebuilds.
|
||||
@@ -1022,67 +1022,6 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/semver.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg_embedding-build"
|
||||
# compile pg_embedding extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS pg_embedding-src
|
||||
ARG PG_VERSION
|
||||
|
||||
# This is our extension, support stopped in favor of pgvector
|
||||
# TODO: deprecate it
|
||||
WORKDIR /ext-src
|
||||
RUN case "${PG_VERSION:?}" in \
|
||||
"v14" | "v15") \
|
||||
export PG_EMBEDDING_VERSION=0.3.5 \
|
||||
export PG_EMBEDDING_CHECKSUM=0e95b27b8b6196e2cf0a0c9ec143fe2219b82e54c5bb4ee064e76398cbe69ae9 \
|
||||
;; \
|
||||
*) \
|
||||
echo "pg_embedding not supported on this PostgreSQL version. Use pgvector instead." && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/neondatabase/pg_embedding/archive/refs/tags/${PG_EMBEDDING_VERSION}.tar.gz -O pg_embedding.tar.gz && \
|
||||
echo "${PG_EMBEDDING_CHECKSUM} pg_embedding.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_embedding-src && cd pg_embedding-src && tar xzf ../pg_embedding.tar.gz --strip-components=1 -C .
|
||||
|
||||
FROM pg-build AS pg_embedding-build
|
||||
COPY --from=pg_embedding-src /ext-src/ /ext-src/
|
||||
WORKDIR /ext-src/
|
||||
RUN if [ -d pg_embedding-src ]; then \
|
||||
cd pg_embedding-src && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install; \
|
||||
fi
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg_anon-build"
|
||||
# compile anon extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS pg_anon-src
|
||||
ARG PG_VERSION
|
||||
|
||||
# This is an experimental extension, never got to real production.
|
||||
# !Do not remove! It can be present in shared_preload_libraries and compute will fail to start if library is not found.
|
||||
WORKDIR /ext-src
|
||||
RUN case "${PG_VERSION:?}" in "v17") \
|
||||
echo "postgresql_anonymizer does not yet support PG17" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/neondatabase/postgresql_anonymizer/archive/refs/tags/neon_1.1.1.tar.gz -O pg_anon.tar.gz && \
|
||||
echo "321ea8d5c1648880aafde850a2c576e4a9e7b9933a34ce272efc839328999fa9 pg_anon.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_anon-src && cd pg_anon-src && tar xzf ../pg_anon.tar.gz --strip-components=1 -C .
|
||||
|
||||
FROM pg-build AS pg_anon-build
|
||||
COPY --from=pg_anon-src /ext-src/ /ext-src/
|
||||
WORKDIR /ext-src
|
||||
RUN if [ -d pg_anon-src ]; then \
|
||||
cd pg_anon-src && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/anon.control; \
|
||||
fi
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg build with nonroot user and cargo installed"
|
||||
@@ -1366,8 +1305,8 @@ ARG PG_VERSION
|
||||
# Do not update without approve from proxy team
|
||||
# Make sure the version is reflected in proxy/src/serverless/local_conn_pool.rs
|
||||
WORKDIR /ext-src
|
||||
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.2.0.tar.gz -O pg_session_jwt.tar.gz && \
|
||||
echo "5ace028e591f2e000ca10afa5b1ca62203ebff014c2907c0ec3b29c36f28a1bb pg_session_jwt.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.3.0.tar.gz -O pg_session_jwt.tar.gz && \
|
||||
echo "19be2dc0b3834d643706ed430af998bb4c2cdf24b3c45e7b102bb3a550e8660c pg_session_jwt.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_session_jwt-src && cd pg_session_jwt-src && tar xzf ../pg_session_jwt.tar.gz --strip-components=1 -C . && \
|
||||
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "0.12.9", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
sed -i 's/version = "0.12.6"/version = "0.12.9"/g' pgrx-tests/Cargo.toml && \
|
||||
@@ -1588,6 +1527,51 @@ COPY --from=pgauditlogtofile-src /ext-src/ /ext-src/
|
||||
WORKDIR /ext-src/pgauditlogtofile-src
|
||||
RUN make install USE_PGXS=1 -j $(getconf _NPROCESSORS_ONLN)
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg_rest-build"
|
||||
# compile pg_rest extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS pg_rest-src
|
||||
ARG PG_VERSION
|
||||
ARG PG_REST_VERSION=3.0.1
|
||||
|
||||
# Only supported for PostgreSQL v17
|
||||
RUN if [ "${PG_VERSION:?}" != "v17" ]; then \
|
||||
echo "pg_rest extension is only supported for PostgreSQL v17" && exit 0; \
|
||||
fi
|
||||
|
||||
WORKDIR /ext-src
|
||||
RUN mkdir -p pg_rest-src && cd pg_rest-src && \
|
||||
wget https://github.com/ruslantalpa/foxfirebase/raw/main/pg_rest_pg17-${PG_REST_VERSION}_neon-debian-bookworm_aarch64.deb -O pg_rest_pg17-${PG_REST_VERSION}_aarch64.deb && \
|
||||
wget https://github.com/ruslantalpa/foxfirebase/raw/main/pg_rest_pg17-${PG_REST_VERSION}_neon-debian-bookworm_amd64.deb -O pg_rest_pg17-${PG_REST_VERSION}_amd64.deb
|
||||
|
||||
FROM pg-build AS pg_rest-build
|
||||
ARG PG_REST_VERSION=3.0.1
|
||||
ARG PG_VERSION
|
||||
COPY --from=pg_rest-src /ext-src/ /ext-src/
|
||||
WORKDIR /ext-src/pg_rest-src
|
||||
RUN if [ "${PG_VERSION:?}" = "v17" ]; then \
|
||||
export ARCH=$(uname -m) && \
|
||||
echo "ARCH: $ARCH" && \
|
||||
# Map architecture names to package names
|
||||
if [ "$ARCH" = "x86_64" ]; then \
|
||||
PACKAGE_ARCH="amd64"; \
|
||||
else \
|
||||
PACKAGE_ARCH="$ARCH"; \
|
||||
fi && \
|
||||
echo "Using package architecture: $PACKAGE_ARCH" && \
|
||||
apt-get update && \
|
||||
apt-get install -y ./pg_rest_pg17-${PG_REST_VERSION}_${PACKAGE_ARCH}.deb && \
|
||||
sed -i 's/superuser = false/superuser = true/g' /usr/local/pgsql/share/extension/pg_rest.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_rest.control && \
|
||||
# Clean up
|
||||
apt-get clean && rm -rf /var/lib/apt/lists/*; \
|
||||
else \
|
||||
echo "pg_rest extension is only supported for PostgreSQL v17, skipping build"; \
|
||||
fi
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "neon-ext-build"
|
||||
@@ -1675,9 +1659,7 @@ COPY --from=rdkit-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_uuidv7-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_roaringbitmap-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_semver-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_embedding-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=wal2json-build /usr/local/pgsql /usr/local/pgsql
|
||||
COPY --from=pg_anon-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_ivm-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_mooncake-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
@@ -1685,6 +1667,7 @@ COPY --from=pg_duckdb-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_repack-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pgaudit-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pgauditlogtofile-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_rest-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
@@ -1853,7 +1836,6 @@ COPY --from=pg_cron-src /ext-src/ /ext-src/
|
||||
COPY --from=pg_uuidv7-src /ext-src/ /ext-src/
|
||||
COPY --from=pg_roaringbitmap-src /ext-src/ /ext-src/
|
||||
COPY --from=pg_semver-src /ext-src/ /ext-src/
|
||||
#COPY --from=pg_embedding-src /ext-src/ /ext-src/
|
||||
#COPY --from=wal2json-src /ext-src/ /ext-src/
|
||||
COPY --from=pg_ivm-src /ext-src/ /ext-src/
|
||||
COPY --from=pg_partman-src /ext-src/ /ext-src/
|
||||
@@ -1862,7 +1844,6 @@ COPY --from=pg_repack-src /ext-src/ /ext-src/
|
||||
COPY --from=pg_repack-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY compute/patches/pg_repack.patch /ext-src
|
||||
RUN cd /ext-src/pg_repack-src && patch -p1 </ext-src/pg_repack.patch && rm -f /ext-src/pg_repack.patch
|
||||
|
||||
COPY --chmod=755 docker-compose/run-tests.sh /run-tests.sh
|
||||
RUN apt-get update && apt-get install -y libtap-parser-sourcehandler-pgtap-perl\
|
||||
&& apt clean && rm -rf /ext-src/*.tar.gz /var/lib/apt/lists/*
|
||||
@@ -1995,3 +1976,4 @@ RUN mkdir /var/run/rsyslogd && \
|
||||
ENV LANG=en_US.utf8
|
||||
USER postgres
|
||||
ENTRYPOINT ["/usr/local/bin/compute_ctl"]
|
||||
|
||||
|
||||
@@ -33,6 +33,7 @@
|
||||
import 'sql_exporter/lfc_hits.libsonnet',
|
||||
import 'sql_exporter/lfc_misses.libsonnet',
|
||||
import 'sql_exporter/lfc_used.libsonnet',
|
||||
import 'sql_exporter/lfc_used_pages.libsonnet',
|
||||
import 'sql_exporter/lfc_writes.libsonnet',
|
||||
import 'sql_exporter/logical_slot_restart_lsn.libsonnet',
|
||||
import 'sql_exporter/max_cluster_size.libsonnet',
|
||||
|
||||
10
compute/etc/sql_exporter/lfc_used_pages.libsonnet
Normal file
10
compute/etc/sql_exporter/lfc_used_pages.libsonnet
Normal file
@@ -0,0 +1,10 @@
|
||||
{
|
||||
metric_name: 'lfc_used_pages',
|
||||
type: 'gauge',
|
||||
help: 'LFC pages used',
|
||||
key_labels: null,
|
||||
values: [
|
||||
'lfc_used_pages',
|
||||
],
|
||||
query: importstr 'sql_exporter/lfc_used_pages.sql',
|
||||
}
|
||||
1
compute/etc/sql_exporter/lfc_used_pages.sql
Normal file
1
compute/etc/sql_exporter/lfc_used_pages.sql
Normal file
@@ -0,0 +1 @@
|
||||
SELECT lfc_value AS lfc_used_pages FROM neon.neon_lfc_stats WHERE lfc_key = 'file_cache_used_pages';
|
||||
@@ -202,10 +202,10 @@ index cf0b80d616..e8e2a14a4a 100644
|
||||
COMMENT ON CONSTRAINT the_constraint ON constraint_comments_tbl IS 'no, the comment';
|
||||
ERROR: must be owner of relation constraint_comments_tbl
|
||||
diff --git a/src/test/regress/expected/conversion.out b/src/test/regress/expected/conversion.out
|
||||
index 442e7aff2b..525f732b03 100644
|
||||
index d785f92561..16377e5ac9 100644
|
||||
--- a/src/test/regress/expected/conversion.out
|
||||
+++ b/src/test/regress/expected/conversion.out
|
||||
@@ -8,7 +8,7 @@
|
||||
@@ -15,7 +15,7 @@ SELECT FROM test_enc_setup();
|
||||
CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, result OUT bytea)
|
||||
AS :'regresslib', 'test_enc_conversion'
|
||||
LANGUAGE C STRICT;
|
||||
@@ -587,16 +587,15 @@ index f551624afb..57f1e432d4 100644
|
||||
SELECT *
|
||||
INTO TABLE ramp
|
||||
diff --git a/src/test/regress/expected/database.out b/src/test/regress/expected/database.out
|
||||
index 454db91ec0..01378d7081 100644
|
||||
index 4cbdbdf84d..573362850e 100644
|
||||
--- a/src/test/regress/expected/database.out
|
||||
+++ b/src/test/regress/expected/database.out
|
||||
@@ -1,8 +1,7 @@
|
||||
@@ -1,8 +1,6 @@
|
||||
CREATE DATABASE regression_tbd
|
||||
ENCODING utf8 LC_COLLATE "C" LC_CTYPE "C" TEMPLATE template0;
|
||||
ALTER DATABASE regression_tbd RENAME TO regression_utf8;
|
||||
-ALTER DATABASE regression_utf8 SET TABLESPACE regress_tblspace;
|
||||
-ALTER DATABASE regression_utf8 RESET TABLESPACE;
|
||||
+WARNING: you need to manually restart any running background workers after this command
|
||||
ALTER DATABASE regression_utf8 CONNECTION_LIMIT 123;
|
||||
-- Test PgDatabaseToastTable. Doing this with GRANT would be slow.
|
||||
BEGIN;
|
||||
@@ -700,7 +699,7 @@ index 6ed50fdcfa..caa00a345d 100644
|
||||
COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
|
||||
CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator;
|
||||
diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out
|
||||
index 6b8c2f2414..8e13b7fa46 100644
|
||||
index 84745b9f60..4883c12351 100644
|
||||
--- a/src/test/regress/expected/foreign_key.out
|
||||
+++ b/src/test/regress/expected/foreign_key.out
|
||||
@@ -1985,7 +1985,7 @@ ALTER TABLE fk_partitioned_fk_6 ATTACH PARTITION fk_partitioned_pk_6 FOR VALUES
|
||||
@@ -1112,7 +1111,7 @@ index 8475231735..0653946337 100644
|
||||
DROP ROLE regress_passwd_sha_len1;
|
||||
DROP ROLE regress_passwd_sha_len2;
|
||||
diff --git a/src/test/regress/expected/privileges.out b/src/test/regress/expected/privileges.out
|
||||
index 5b9dba7b32..cc408dad42 100644
|
||||
index 620fbe8c52..0570102357 100644
|
||||
--- a/src/test/regress/expected/privileges.out
|
||||
+++ b/src/test/regress/expected/privileges.out
|
||||
@@ -20,19 +20,19 @@ SELECT lo_unlink(oid) FROM pg_largeobject_metadata WHERE oid >= 1000 AND oid < 3
|
||||
@@ -1174,8 +1173,8 @@ index 5b9dba7b32..cc408dad42 100644
|
||||
+CREATE GROUP regress_priv_group2 WITH ADMIN regress_priv_user1 PASSWORD NEON_PASSWORD_PLACEHOLDER USER regress_priv_user2;
|
||||
ALTER GROUP regress_priv_group1 ADD USER regress_priv_user4;
|
||||
GRANT regress_priv_group2 TO regress_priv_user2 GRANTED BY regress_priv_user1;
|
||||
SET SESSION AUTHORIZATION regress_priv_user1;
|
||||
@@ -239,12 +239,16 @@ GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY regre
|
||||
SET SESSION AUTHORIZATION regress_priv_user3;
|
||||
@@ -246,12 +246,16 @@ GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY regre
|
||||
ERROR: permission denied to grant privileges as role "regress_priv_role"
|
||||
DETAIL: The grantor must have the ADMIN option on role "regress_priv_role".
|
||||
GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY CURRENT_ROLE;
|
||||
@@ -1192,7 +1191,7 @@ index 5b9dba7b32..cc408dad42 100644
|
||||
DROP ROLE regress_priv_role;
|
||||
SET SESSION AUTHORIZATION regress_priv_user1;
|
||||
SELECT session_user, current_user;
|
||||
@@ -1776,7 +1780,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
|
||||
@@ -1783,7 +1787,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
|
||||
|
||||
-- security-restricted operations
|
||||
\c -
|
||||
@@ -1201,7 +1200,7 @@ index 5b9dba7b32..cc408dad42 100644
|
||||
-- Check that index expressions and predicates are run as the table's owner
|
||||
-- A dummy index function checking current_user
|
||||
CREATE FUNCTION sro_ifun(int) RETURNS int AS $$
|
||||
@@ -2668,8 +2672,8 @@ drop cascades to function testns.priv_testagg(integer)
|
||||
@@ -2675,8 +2679,8 @@ drop cascades to function testns.priv_testagg(integer)
|
||||
drop cascades to function testns.priv_testproc(integer)
|
||||
-- Change owner of the schema & and rename of new schema owner
|
||||
\c -
|
||||
@@ -1212,7 +1211,7 @@ index 5b9dba7b32..cc408dad42 100644
|
||||
SET SESSION ROLE regress_schemauser1;
|
||||
CREATE SCHEMA testns;
|
||||
SELECT nspname, rolname FROM pg_namespace, pg_roles WHERE pg_namespace.nspname = 'testns' AND pg_namespace.nspowner = pg_roles.oid;
|
||||
@@ -2792,7 +2796,7 @@ DROP USER regress_priv_user7;
|
||||
@@ -2799,7 +2803,7 @@ DROP USER regress_priv_user7;
|
||||
DROP USER regress_priv_user8; -- does not exist
|
||||
ERROR: role "regress_priv_user8" does not exist
|
||||
-- permissions with LOCK TABLE
|
||||
@@ -1221,7 +1220,7 @@ index 5b9dba7b32..cc408dad42 100644
|
||||
CREATE TABLE lock_table (a int);
|
||||
-- LOCK TABLE and SELECT permission
|
||||
GRANT SELECT ON lock_table TO regress_locktable_user;
|
||||
@@ -2874,7 +2878,7 @@ DROP USER regress_locktable_user;
|
||||
@@ -2881,7 +2885,7 @@ DROP USER regress_locktable_user;
|
||||
-- pg_backend_memory_contexts.
|
||||
-- switch to superuser
|
||||
\c -
|
||||
@@ -1230,7 +1229,7 @@ index 5b9dba7b32..cc408dad42 100644
|
||||
SELECT has_table_privilege('regress_readallstats','pg_backend_memory_contexts','SELECT'); -- no
|
||||
has_table_privilege
|
||||
---------------------
|
||||
@@ -2918,10 +2922,10 @@ RESET ROLE;
|
||||
@@ -2925,10 +2929,10 @@ RESET ROLE;
|
||||
-- clean up
|
||||
DROP ROLE regress_readallstats;
|
||||
-- test role grantor machinery
|
||||
@@ -1245,7 +1244,7 @@ index 5b9dba7b32..cc408dad42 100644
|
||||
GRANT regress_group TO regress_group_direct_manager WITH INHERIT FALSE, ADMIN TRUE;
|
||||
GRANT regress_group_direct_manager TO regress_group_indirect_manager;
|
||||
SET SESSION AUTHORIZATION regress_group_direct_manager;
|
||||
@@ -2950,9 +2954,9 @@ DROP ROLE regress_group_direct_manager;
|
||||
@@ -2957,9 +2961,9 @@ DROP ROLE regress_group_direct_manager;
|
||||
DROP ROLE regress_group_indirect_manager;
|
||||
DROP ROLE regress_group_member;
|
||||
-- test SET and INHERIT options with object ownership changes
|
||||
@@ -1841,7 +1840,7 @@ index 09a255649b..15895f0c53 100644
|
||||
CREATE TABLE ruletest_t2 (x int);
|
||||
CREATE VIEW ruletest_v1 WITH (security_invoker=true) AS
|
||||
diff --git a/src/test/regress/expected/security_label.out b/src/test/regress/expected/security_label.out
|
||||
index a8e01a6220..5a9cef4ede 100644
|
||||
index a8e01a6220..83543b250a 100644
|
||||
--- a/src/test/regress/expected/security_label.out
|
||||
+++ b/src/test/regress/expected/security_label.out
|
||||
@@ -6,8 +6,8 @@ SET client_min_messages TO 'warning';
|
||||
@@ -1855,34 +1854,6 @@ index a8e01a6220..5a9cef4ede 100644
|
||||
CREATE TABLE seclabel_tbl1 (a int, b text);
|
||||
CREATE TABLE seclabel_tbl2 (x int, y text);
|
||||
CREATE VIEW seclabel_view1 AS SELECT * FROM seclabel_tbl2;
|
||||
@@ -19,21 +19,21 @@ ALTER TABLE seclabel_tbl2 OWNER TO regress_seclabel_user2;
|
||||
-- Test of SECURITY LABEL statement without a plugin
|
||||
--
|
||||
SECURITY LABEL ON TABLE seclabel_tbl1 IS 'classified'; -- fail
|
||||
-ERROR: no security label providers have been loaded
|
||||
+ERROR: must specify provider when multiple security label providers have been loaded
|
||||
SECURITY LABEL FOR 'dummy' ON TABLE seclabel_tbl1 IS 'classified'; -- fail
|
||||
ERROR: security label provider "dummy" is not loaded
|
||||
SECURITY LABEL ON TABLE seclabel_tbl1 IS '...invalid label...'; -- fail
|
||||
-ERROR: no security label providers have been loaded
|
||||
+ERROR: must specify provider when multiple security label providers have been loaded
|
||||
SECURITY LABEL ON TABLE seclabel_tbl3 IS 'unclassified'; -- fail
|
||||
-ERROR: no security label providers have been loaded
|
||||
+ERROR: must specify provider when multiple security label providers have been loaded
|
||||
SECURITY LABEL ON ROLE regress_seclabel_user1 IS 'classified'; -- fail
|
||||
-ERROR: no security label providers have been loaded
|
||||
+ERROR: must specify provider when multiple security label providers have been loaded
|
||||
SECURITY LABEL FOR 'dummy' ON ROLE regress_seclabel_user1 IS 'classified'; -- fail
|
||||
ERROR: security label provider "dummy" is not loaded
|
||||
SECURITY LABEL ON ROLE regress_seclabel_user1 IS '...invalid label...'; -- fail
|
||||
-ERROR: no security label providers have been loaded
|
||||
+ERROR: must specify provider when multiple security label providers have been loaded
|
||||
SECURITY LABEL ON ROLE regress_seclabel_user3 IS 'unclassified'; -- fail
|
||||
-ERROR: no security label providers have been loaded
|
||||
+ERROR: must specify provider when multiple security label providers have been loaded
|
||||
-- clean up objects
|
||||
DROP FUNCTION seclabel_four();
|
||||
DROP DOMAIN seclabel_domain;
|
||||
diff --git a/src/test/regress/expected/select_into.out b/src/test/regress/expected/select_into.out
|
||||
index b79fe9a1c0..e29fab88ab 100644
|
||||
--- a/src/test/regress/expected/select_into.out
|
||||
@@ -2413,10 +2384,10 @@ index e3e3bea709..fa86ddc326 100644
|
||||
COMMENT ON CONSTRAINT the_constraint ON constraint_comments_tbl IS 'no, the comment';
|
||||
COMMENT ON CONSTRAINT the_constraint ON DOMAIN constraint_comments_dom IS 'no, another comment';
|
||||
diff --git a/src/test/regress/sql/conversion.sql b/src/test/regress/sql/conversion.sql
|
||||
index 9a65fca91f..58431a3056 100644
|
||||
index b567a1a572..4d1ac2e631 100644
|
||||
--- a/src/test/regress/sql/conversion.sql
|
||||
+++ b/src/test/regress/sql/conversion.sql
|
||||
@@ -12,7 +12,7 @@ CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, r
|
||||
@@ -17,7 +17,7 @@ CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, r
|
||||
AS :'regresslib', 'test_enc_conversion'
|
||||
LANGUAGE C STRICT;
|
||||
|
||||
@@ -2780,7 +2751,7 @@ index ae6841308b..47bc792e30 100644
|
||||
|
||||
SELECT *
|
||||
diff --git a/src/test/regress/sql/database.sql b/src/test/regress/sql/database.sql
|
||||
index 0367c0e37a..a23b98c4bd 100644
|
||||
index 46ad263478..eb05584ed5 100644
|
||||
--- a/src/test/regress/sql/database.sql
|
||||
+++ b/src/test/regress/sql/database.sql
|
||||
@@ -1,8 +1,6 @@
|
||||
@@ -2893,7 +2864,7 @@ index aa147b14a9..370e0dd570 100644
|
||||
CREATE FOREIGN DATA WRAPPER dummy;
|
||||
COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
|
||||
diff --git a/src/test/regress/sql/foreign_key.sql b/src/test/regress/sql/foreign_key.sql
|
||||
index 45c7a534cb..32dd26b8cd 100644
|
||||
index 9f4210b26e..620d3fc87e 100644
|
||||
--- a/src/test/regress/sql/foreign_key.sql
|
||||
+++ b/src/test/regress/sql/foreign_key.sql
|
||||
@@ -1435,7 +1435,7 @@ ALTER TABLE fk_partitioned_fk_6 ATTACH PARTITION fk_partitioned_pk_6 FOR VALUES
|
||||
@@ -3246,7 +3217,7 @@ index 53e86b0b6c..0303fdfe96 100644
|
||||
-- Check that the invalid secrets were re-hashed. A re-hashed secret
|
||||
-- should not contain the original salt.
|
||||
diff --git a/src/test/regress/sql/privileges.sql b/src/test/regress/sql/privileges.sql
|
||||
index 249df17a58..b258e7f26a 100644
|
||||
index 259f1aedd1..6e1a3d17b7 100644
|
||||
--- a/src/test/regress/sql/privileges.sql
|
||||
+++ b/src/test/regress/sql/privileges.sql
|
||||
@@ -24,18 +24,18 @@ RESET client_min_messages;
|
||||
@@ -3308,7 +3279,7 @@ index 249df17a58..b258e7f26a 100644
|
||||
|
||||
ALTER GROUP regress_priv_group1 ADD USER regress_priv_user4;
|
||||
|
||||
@@ -1157,7 +1157,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
|
||||
@@ -1160,7 +1160,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
|
||||
|
||||
-- security-restricted operations
|
||||
\c -
|
||||
@@ -3317,7 +3288,7 @@ index 249df17a58..b258e7f26a 100644
|
||||
|
||||
-- Check that index expressions and predicates are run as the table's owner
|
||||
|
||||
@@ -1653,8 +1653,8 @@ DROP SCHEMA testns CASCADE;
|
||||
@@ -1656,8 +1656,8 @@ DROP SCHEMA testns CASCADE;
|
||||
-- Change owner of the schema & and rename of new schema owner
|
||||
\c -
|
||||
|
||||
@@ -3328,7 +3299,7 @@ index 249df17a58..b258e7f26a 100644
|
||||
|
||||
SET SESSION ROLE regress_schemauser1;
|
||||
CREATE SCHEMA testns;
|
||||
@@ -1748,7 +1748,7 @@ DROP USER regress_priv_user8; -- does not exist
|
||||
@@ -1751,7 +1751,7 @@ DROP USER regress_priv_user8; -- does not exist
|
||||
|
||||
|
||||
-- permissions with LOCK TABLE
|
||||
@@ -3337,7 +3308,7 @@ index 249df17a58..b258e7f26a 100644
|
||||
CREATE TABLE lock_table (a int);
|
||||
|
||||
-- LOCK TABLE and SELECT permission
|
||||
@@ -1836,7 +1836,7 @@ DROP USER regress_locktable_user;
|
||||
@@ -1839,7 +1839,7 @@ DROP USER regress_locktable_user;
|
||||
-- switch to superuser
|
||||
\c -
|
||||
|
||||
@@ -3346,7 +3317,7 @@ index 249df17a58..b258e7f26a 100644
|
||||
|
||||
SELECT has_table_privilege('regress_readallstats','pg_backend_memory_contexts','SELECT'); -- no
|
||||
SELECT has_table_privilege('regress_readallstats','pg_shmem_allocations','SELECT'); -- no
|
||||
@@ -1856,10 +1856,10 @@ RESET ROLE;
|
||||
@@ -1859,10 +1859,10 @@ RESET ROLE;
|
||||
DROP ROLE regress_readallstats;
|
||||
|
||||
-- test role grantor machinery
|
||||
@@ -3361,7 +3332,7 @@ index 249df17a58..b258e7f26a 100644
|
||||
|
||||
GRANT regress_group TO regress_group_direct_manager WITH INHERIT FALSE, ADMIN TRUE;
|
||||
GRANT regress_group_direct_manager TO regress_group_indirect_manager;
|
||||
@@ -1881,9 +1881,9 @@ DROP ROLE regress_group_indirect_manager;
|
||||
@@ -1884,9 +1884,9 @@ DROP ROLE regress_group_indirect_manager;
|
||||
DROP ROLE regress_group_member;
|
||||
|
||||
-- test SET and INHERIT options with object ownership changes
|
||||
|
||||
@@ -202,10 +202,10 @@ index cf0b80d616..e8e2a14a4a 100644
|
||||
COMMENT ON CONSTRAINT the_constraint ON constraint_comments_tbl IS 'no, the comment';
|
||||
ERROR: must be owner of relation constraint_comments_tbl
|
||||
diff --git a/src/test/regress/expected/conversion.out b/src/test/regress/expected/conversion.out
|
||||
index 442e7aff2b..525f732b03 100644
|
||||
index d785f92561..16377e5ac9 100644
|
||||
--- a/src/test/regress/expected/conversion.out
|
||||
+++ b/src/test/regress/expected/conversion.out
|
||||
@@ -8,7 +8,7 @@
|
||||
@@ -15,7 +15,7 @@ SELECT FROM test_enc_setup();
|
||||
CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, result OUT bytea)
|
||||
AS :'regresslib', 'test_enc_conversion'
|
||||
LANGUAGE C STRICT;
|
||||
@@ -587,16 +587,15 @@ index f551624afb..57f1e432d4 100644
|
||||
SELECT *
|
||||
INTO TABLE ramp
|
||||
diff --git a/src/test/regress/expected/database.out b/src/test/regress/expected/database.out
|
||||
index 454db91ec0..01378d7081 100644
|
||||
index 4cbdbdf84d..573362850e 100644
|
||||
--- a/src/test/regress/expected/database.out
|
||||
+++ b/src/test/regress/expected/database.out
|
||||
@@ -1,8 +1,7 @@
|
||||
@@ -1,8 +1,6 @@
|
||||
CREATE DATABASE regression_tbd
|
||||
ENCODING utf8 LC_COLLATE "C" LC_CTYPE "C" TEMPLATE template0;
|
||||
ALTER DATABASE regression_tbd RENAME TO regression_utf8;
|
||||
-ALTER DATABASE regression_utf8 SET TABLESPACE regress_tblspace;
|
||||
-ALTER DATABASE regression_utf8 RESET TABLESPACE;
|
||||
+WARNING: you need to manually restart any running background workers after this command
|
||||
ALTER DATABASE regression_utf8 CONNECTION_LIMIT 123;
|
||||
-- Test PgDatabaseToastTable. Doing this with GRANT would be slow.
|
||||
BEGIN;
|
||||
@@ -700,7 +699,7 @@ index 6ed50fdcfa..caa00a345d 100644
|
||||
COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
|
||||
CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator;
|
||||
diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out
|
||||
index 69994c98e3..129abcfbe8 100644
|
||||
index fe6a1015f2..614b387b7d 100644
|
||||
--- a/src/test/regress/expected/foreign_key.out
|
||||
+++ b/src/test/regress/expected/foreign_key.out
|
||||
@@ -1985,7 +1985,7 @@ ALTER TABLE fk_partitioned_fk_6 ATTACH PARTITION fk_partitioned_pk_6 FOR VALUES
|
||||
@@ -1147,7 +1146,7 @@ index 924d6e001d..7fdda73439 100644
|
||||
DROP ROLE regress_passwd_sha_len1;
|
||||
DROP ROLE regress_passwd_sha_len2;
|
||||
diff --git a/src/test/regress/expected/privileges.out b/src/test/regress/expected/privileges.out
|
||||
index 1296da0d57..f43fffa44c 100644
|
||||
index e8c668e0a1..03be5c2120 100644
|
||||
--- a/src/test/regress/expected/privileges.out
|
||||
+++ b/src/test/regress/expected/privileges.out
|
||||
@@ -20,19 +20,19 @@ SELECT lo_unlink(oid) FROM pg_largeobject_metadata WHERE oid >= 1000 AND oid < 3
|
||||
@@ -1209,8 +1208,8 @@ index 1296da0d57..f43fffa44c 100644
|
||||
+CREATE GROUP regress_priv_group2 WITH ADMIN regress_priv_user1 PASSWORD NEON_PASSWORD_PLACEHOLDER USER regress_priv_user2;
|
||||
ALTER GROUP regress_priv_group1 ADD USER regress_priv_user4;
|
||||
GRANT regress_priv_group2 TO regress_priv_user2 GRANTED BY regress_priv_user1;
|
||||
SET SESSION AUTHORIZATION regress_priv_user1;
|
||||
@@ -239,12 +239,16 @@ GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY regre
|
||||
SET SESSION AUTHORIZATION regress_priv_user3;
|
||||
@@ -246,12 +246,16 @@ GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY regre
|
||||
ERROR: permission denied to grant privileges as role "regress_priv_role"
|
||||
DETAIL: The grantor must have the ADMIN option on role "regress_priv_role".
|
||||
GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY CURRENT_ROLE;
|
||||
@@ -1227,7 +1226,7 @@ index 1296da0d57..f43fffa44c 100644
|
||||
DROP ROLE regress_priv_role;
|
||||
SET SESSION AUTHORIZATION regress_priv_user1;
|
||||
SELECT session_user, current_user;
|
||||
@@ -1776,7 +1780,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
|
||||
@@ -1783,7 +1787,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
|
||||
|
||||
-- security-restricted operations
|
||||
\c -
|
||||
@@ -1236,7 +1235,7 @@ index 1296da0d57..f43fffa44c 100644
|
||||
-- Check that index expressions and predicates are run as the table's owner
|
||||
-- A dummy index function checking current_user
|
||||
CREATE FUNCTION sro_ifun(int) RETURNS int AS $$
|
||||
@@ -2668,8 +2672,8 @@ drop cascades to function testns.priv_testagg(integer)
|
||||
@@ -2675,8 +2679,8 @@ drop cascades to function testns.priv_testagg(integer)
|
||||
drop cascades to function testns.priv_testproc(integer)
|
||||
-- Change owner of the schema & and rename of new schema owner
|
||||
\c -
|
||||
@@ -1247,7 +1246,7 @@ index 1296da0d57..f43fffa44c 100644
|
||||
SET SESSION ROLE regress_schemauser1;
|
||||
CREATE SCHEMA testns;
|
||||
SELECT nspname, rolname FROM pg_namespace, pg_roles WHERE pg_namespace.nspname = 'testns' AND pg_namespace.nspowner = pg_roles.oid;
|
||||
@@ -2792,7 +2796,7 @@ DROP USER regress_priv_user7;
|
||||
@@ -2799,7 +2803,7 @@ DROP USER regress_priv_user7;
|
||||
DROP USER regress_priv_user8; -- does not exist
|
||||
ERROR: role "regress_priv_user8" does not exist
|
||||
-- permissions with LOCK TABLE
|
||||
@@ -1256,7 +1255,7 @@ index 1296da0d57..f43fffa44c 100644
|
||||
CREATE TABLE lock_table (a int);
|
||||
-- LOCK TABLE and SELECT permission
|
||||
GRANT SELECT ON lock_table TO regress_locktable_user;
|
||||
@@ -2888,7 +2892,7 @@ DROP USER regress_locktable_user;
|
||||
@@ -2895,7 +2899,7 @@ DROP USER regress_locktable_user;
|
||||
-- pg_backend_memory_contexts.
|
||||
-- switch to superuser
|
||||
\c -
|
||||
@@ -1265,7 +1264,7 @@ index 1296da0d57..f43fffa44c 100644
|
||||
SELECT has_table_privilege('regress_readallstats','pg_backend_memory_contexts','SELECT'); -- no
|
||||
has_table_privilege
|
||||
---------------------
|
||||
@@ -2932,10 +2936,10 @@ RESET ROLE;
|
||||
@@ -2939,10 +2943,10 @@ RESET ROLE;
|
||||
-- clean up
|
||||
DROP ROLE regress_readallstats;
|
||||
-- test role grantor machinery
|
||||
@@ -1280,7 +1279,7 @@ index 1296da0d57..f43fffa44c 100644
|
||||
GRANT regress_group TO regress_group_direct_manager WITH INHERIT FALSE, ADMIN TRUE;
|
||||
GRANT regress_group_direct_manager TO regress_group_indirect_manager;
|
||||
SET SESSION AUTHORIZATION regress_group_direct_manager;
|
||||
@@ -2964,9 +2968,9 @@ DROP ROLE regress_group_direct_manager;
|
||||
@@ -2971,9 +2975,9 @@ DROP ROLE regress_group_direct_manager;
|
||||
DROP ROLE regress_group_indirect_manager;
|
||||
DROP ROLE regress_group_member;
|
||||
-- test SET and INHERIT options with object ownership changes
|
||||
@@ -1293,7 +1292,7 @@ index 1296da0d57..f43fffa44c 100644
|
||||
CREATE SCHEMA regress_roleoption;
|
||||
GRANT CREATE, USAGE ON SCHEMA regress_roleoption TO PUBLIC;
|
||||
GRANT regress_roleoption_donor TO regress_roleoption_protagonist WITH INHERIT TRUE, SET FALSE;
|
||||
@@ -2995,9 +2999,9 @@ DROP ROLE regress_roleoption_protagonist;
|
||||
@@ -3002,9 +3006,9 @@ DROP ROLE regress_roleoption_protagonist;
|
||||
DROP ROLE regress_roleoption_donor;
|
||||
DROP ROLE regress_roleoption_recipient;
|
||||
-- MAINTAIN
|
||||
@@ -2433,10 +2432,10 @@ index e3e3bea709..fa86ddc326 100644
|
||||
COMMENT ON CONSTRAINT the_constraint ON constraint_comments_tbl IS 'no, the comment';
|
||||
COMMENT ON CONSTRAINT the_constraint ON DOMAIN constraint_comments_dom IS 'no, another comment';
|
||||
diff --git a/src/test/regress/sql/conversion.sql b/src/test/regress/sql/conversion.sql
|
||||
index 9a65fca91f..58431a3056 100644
|
||||
index b567a1a572..4d1ac2e631 100644
|
||||
--- a/src/test/regress/sql/conversion.sql
|
||||
+++ b/src/test/regress/sql/conversion.sql
|
||||
@@ -12,7 +12,7 @@ CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, r
|
||||
@@ -17,7 +17,7 @@ CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, r
|
||||
AS :'regresslib', 'test_enc_conversion'
|
||||
LANGUAGE C STRICT;
|
||||
|
||||
@@ -2800,7 +2799,7 @@ index ae6841308b..47bc792e30 100644
|
||||
|
||||
SELECT *
|
||||
diff --git a/src/test/regress/sql/database.sql b/src/test/regress/sql/database.sql
|
||||
index 0367c0e37a..a23b98c4bd 100644
|
||||
index 46ad263478..eb05584ed5 100644
|
||||
--- a/src/test/regress/sql/database.sql
|
||||
+++ b/src/test/regress/sql/database.sql
|
||||
@@ -1,8 +1,6 @@
|
||||
@@ -2913,7 +2912,7 @@ index aa147b14a9..370e0dd570 100644
|
||||
CREATE FOREIGN DATA WRAPPER dummy;
|
||||
COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
|
||||
diff --git a/src/test/regress/sql/foreign_key.sql b/src/test/regress/sql/foreign_key.sql
|
||||
index 2e710e419c..89cd481a54 100644
|
||||
index 8c4e4c7c83..e946cd2119 100644
|
||||
--- a/src/test/regress/sql/foreign_key.sql
|
||||
+++ b/src/test/regress/sql/foreign_key.sql
|
||||
@@ -1435,7 +1435,7 @@ ALTER TABLE fk_partitioned_fk_6 ATTACH PARTITION fk_partitioned_pk_6 FOR VALUES
|
||||
@@ -3301,7 +3300,7 @@ index bb82aa4aa2..dd8a05e24d 100644
|
||||
-- Check that the invalid secrets were re-hashed. A re-hashed secret
|
||||
-- should not contain the original salt.
|
||||
diff --git a/src/test/regress/sql/privileges.sql b/src/test/regress/sql/privileges.sql
|
||||
index 5880bc018d..27aa952b18 100644
|
||||
index b7e1cb6cdd..6e5a2217f1 100644
|
||||
--- a/src/test/regress/sql/privileges.sql
|
||||
+++ b/src/test/regress/sql/privileges.sql
|
||||
@@ -24,18 +24,18 @@ RESET client_min_messages;
|
||||
@@ -3363,7 +3362,7 @@ index 5880bc018d..27aa952b18 100644
|
||||
|
||||
ALTER GROUP regress_priv_group1 ADD USER regress_priv_user4;
|
||||
|
||||
@@ -1157,7 +1157,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
|
||||
@@ -1160,7 +1160,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
|
||||
|
||||
-- security-restricted operations
|
||||
\c -
|
||||
@@ -3372,7 +3371,7 @@ index 5880bc018d..27aa952b18 100644
|
||||
|
||||
-- Check that index expressions and predicates are run as the table's owner
|
||||
|
||||
@@ -1653,8 +1653,8 @@ DROP SCHEMA testns CASCADE;
|
||||
@@ -1656,8 +1656,8 @@ DROP SCHEMA testns CASCADE;
|
||||
-- Change owner of the schema & and rename of new schema owner
|
||||
\c -
|
||||
|
||||
@@ -3383,7 +3382,7 @@ index 5880bc018d..27aa952b18 100644
|
||||
|
||||
SET SESSION ROLE regress_schemauser1;
|
||||
CREATE SCHEMA testns;
|
||||
@@ -1748,7 +1748,7 @@ DROP USER regress_priv_user8; -- does not exist
|
||||
@@ -1751,7 +1751,7 @@ DROP USER regress_priv_user8; -- does not exist
|
||||
|
||||
|
||||
-- permissions with LOCK TABLE
|
||||
@@ -3392,7 +3391,7 @@ index 5880bc018d..27aa952b18 100644
|
||||
CREATE TABLE lock_table (a int);
|
||||
|
||||
-- LOCK TABLE and SELECT permission
|
||||
@@ -1851,7 +1851,7 @@ DROP USER regress_locktable_user;
|
||||
@@ -1854,7 +1854,7 @@ DROP USER regress_locktable_user;
|
||||
-- switch to superuser
|
||||
\c -
|
||||
|
||||
@@ -3401,7 +3400,7 @@ index 5880bc018d..27aa952b18 100644
|
||||
|
||||
SELECT has_table_privilege('regress_readallstats','pg_backend_memory_contexts','SELECT'); -- no
|
||||
SELECT has_table_privilege('regress_readallstats','pg_shmem_allocations','SELECT'); -- no
|
||||
@@ -1871,10 +1871,10 @@ RESET ROLE;
|
||||
@@ -1874,10 +1874,10 @@ RESET ROLE;
|
||||
DROP ROLE regress_readallstats;
|
||||
|
||||
-- test role grantor machinery
|
||||
@@ -3416,7 +3415,7 @@ index 5880bc018d..27aa952b18 100644
|
||||
|
||||
GRANT regress_group TO regress_group_direct_manager WITH INHERIT FALSE, ADMIN TRUE;
|
||||
GRANT regress_group_direct_manager TO regress_group_indirect_manager;
|
||||
@@ -1896,9 +1896,9 @@ DROP ROLE regress_group_indirect_manager;
|
||||
@@ -1899,9 +1899,9 @@ DROP ROLE regress_group_indirect_manager;
|
||||
DROP ROLE regress_group_member;
|
||||
|
||||
-- test SET and INHERIT options with object ownership changes
|
||||
@@ -3429,7 +3428,7 @@ index 5880bc018d..27aa952b18 100644
|
||||
CREATE SCHEMA regress_roleoption;
|
||||
GRANT CREATE, USAGE ON SCHEMA regress_roleoption TO PUBLIC;
|
||||
GRANT regress_roleoption_donor TO regress_roleoption_protagonist WITH INHERIT TRUE, SET FALSE;
|
||||
@@ -1926,9 +1926,9 @@ DROP ROLE regress_roleoption_donor;
|
||||
@@ -1929,9 +1929,9 @@ DROP ROLE regress_roleoption_donor;
|
||||
DROP ROLE regress_roleoption_recipient;
|
||||
|
||||
-- MAINTAIN
|
||||
|
||||
@@ -2,23 +2,6 @@ diff --git a/expected/ut-A.out b/expected/ut-A.out
|
||||
index da723b8..5328114 100644
|
||||
--- a/expected/ut-A.out
|
||||
+++ b/expected/ut-A.out
|
||||
@@ -9,13 +9,16 @@ SET search_path TO public;
|
||||
----
|
||||
-- No.A-1-1-3
|
||||
CREATE EXTENSION pg_hint_plan;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
-- No.A-1-2-3
|
||||
DROP EXTENSION pg_hint_plan;
|
||||
-- No.A-1-1-4
|
||||
CREATE SCHEMA other_schema;
|
||||
CREATE EXTENSION pg_hint_plan SCHEMA other_schema;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
ERROR: extension "pg_hint_plan" must be installed in schema "hint_plan"
|
||||
CREATE EXTENSION pg_hint_plan;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
DROP SCHEMA other_schema;
|
||||
----
|
||||
---- No. A-5-1 comment pattern
|
||||
@@ -3175,6 +3178,7 @@ SELECT s.query, s.calls
|
||||
FROM public.pg_stat_statements s
|
||||
JOIN pg_catalog.pg_database d
|
||||
@@ -27,18 +10,6 @@ index da723b8..5328114 100644
|
||||
ORDER BY 1;
|
||||
query | calls
|
||||
--------------------------------------+-------
|
||||
diff --git a/expected/ut-fdw.out b/expected/ut-fdw.out
|
||||
index d372459..6282afe 100644
|
||||
--- a/expected/ut-fdw.out
|
||||
+++ b/expected/ut-fdw.out
|
||||
@@ -7,6 +7,7 @@ SET pg_hint_plan.debug_print TO on;
|
||||
SET client_min_messages TO LOG;
|
||||
SET pg_hint_plan.enable_hint TO on;
|
||||
CREATE EXTENSION file_fdw;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/file_fdw
|
||||
CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw;
|
||||
CREATE USER MAPPING FOR PUBLIC SERVER file_server;
|
||||
CREATE FOREIGN TABLE ft1 (id int, val int) SERVER file_server OPTIONS (format 'csv', filename :'filename');
|
||||
diff --git a/sql/ut-A.sql b/sql/ut-A.sql
|
||||
index 7c7d58a..4fd1a07 100644
|
||||
--- a/sql/ut-A.sql
|
||||
|
||||
@@ -1,24 +1,3 @@
|
||||
diff --git a/expected/ut-A.out b/expected/ut-A.out
|
||||
index e7d68a1..65a056c 100644
|
||||
--- a/expected/ut-A.out
|
||||
+++ b/expected/ut-A.out
|
||||
@@ -9,13 +9,16 @@ SET search_path TO public;
|
||||
----
|
||||
-- No.A-1-1-3
|
||||
CREATE EXTENSION pg_hint_plan;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
-- No.A-1-2-3
|
||||
DROP EXTENSION pg_hint_plan;
|
||||
-- No.A-1-1-4
|
||||
CREATE SCHEMA other_schema;
|
||||
CREATE EXTENSION pg_hint_plan SCHEMA other_schema;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
ERROR: extension "pg_hint_plan" must be installed in schema "hint_plan"
|
||||
CREATE EXTENSION pg_hint_plan;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
DROP SCHEMA other_schema;
|
||||
----
|
||||
---- No. A-5-1 comment pattern
|
||||
diff --git a/expected/ut-J.out b/expected/ut-J.out
|
||||
index 2fa3c70..314e929 100644
|
||||
--- a/expected/ut-J.out
|
||||
@@ -160,15 +139,3 @@ index a09bd34..0ad227c 100644
|
||||
error hint:
|
||||
|
||||
explain_filter
|
||||
diff --git a/expected/ut-fdw.out b/expected/ut-fdw.out
|
||||
index 017fa4b..98d989b 100644
|
||||
--- a/expected/ut-fdw.out
|
||||
+++ b/expected/ut-fdw.out
|
||||
@@ -7,6 +7,7 @@ SET pg_hint_plan.debug_print TO on;
|
||||
SET client_min_messages TO LOG;
|
||||
SET pg_hint_plan.enable_hint TO on;
|
||||
CREATE EXTENSION file_fdw;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/file_fdw
|
||||
CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw;
|
||||
CREATE USER MAPPING FOR PUBLIC SERVER file_server;
|
||||
CREATE FOREIGN TABLE ft1 (id int, val int) SERVER file_server OPTIONS (format 'csv', filename :'filename');
|
||||
|
||||
@@ -1,12 +1,6 @@
|
||||
commit 46b38d3e46f9cd6c70d9b189dd6ff4abaa17cf5e
|
||||
Author: Alexander Bayandin <alexander@neon.tech>
|
||||
Date: Sat Nov 30 18:29:32 2024 +0000
|
||||
|
||||
Fix v8 9.7.37 compilation on Debian 12
|
||||
|
||||
diff --git a/patches/code/84cf3230a9680aac3b73c410c2b758760b6d3066.patch b/patches/code/84cf3230a9680aac3b73c410c2b758760b6d3066.patch
|
||||
new file mode 100644
|
||||
index 0000000..f0a5dc7
|
||||
index 0000000..fae1cb3
|
||||
--- /dev/null
|
||||
+++ b/patches/code/84cf3230a9680aac3b73c410c2b758760b6d3066.patch
|
||||
@@ -0,0 +1,30 @@
|
||||
@@ -35,8 +29,21 @@ index 0000000..f0a5dc7
|
||||
+@@ -5,6 +5,7 @@
|
||||
+ #ifndef V8_HEAP_CPPGC_PREFINALIZER_HANDLER_H_
|
||||
+ #define V8_HEAP_CPPGC_PREFINALIZER_HANDLER_H_
|
||||
+
|
||||
+
|
||||
++#include <utility>
|
||||
+ #include <vector>
|
||||
+
|
||||
+
|
||||
+ #include "include/cppgc/prefinalizer.h"
|
||||
diff --git a/plv8.cc b/plv8.cc
|
||||
index c1ce883..6e47e94 100644
|
||||
--- a/plv8.cc
|
||||
+++ b/plv8.cc
|
||||
@@ -379,7 +379,7 @@ _PG_init(void)
|
||||
NULL,
|
||||
&plv8_v8_flags,
|
||||
NULL,
|
||||
- PGC_USERSET, 0,
|
||||
+ PGC_SUSET, 0,
|
||||
#if PG_VERSION_NUM >= 90100
|
||||
NULL,
|
||||
#endif
|
||||
13
compute/patches/plv8_v3.2.3.patch
Normal file
13
compute/patches/plv8_v3.2.3.patch
Normal file
@@ -0,0 +1,13 @@
|
||||
diff --git a/plv8.cc b/plv8.cc
|
||||
index edfa2aa..623e7f2 100644
|
||||
--- a/plv8.cc
|
||||
+++ b/plv8.cc
|
||||
@@ -385,7 +385,7 @@ _PG_init(void)
|
||||
NULL,
|
||||
&plv8_v8_flags,
|
||||
NULL,
|
||||
- PGC_USERSET, 0,
|
||||
+ PGC_SUSET, 0,
|
||||
#if PG_VERSION_NUM >= 90100
|
||||
NULL,
|
||||
#endif
|
||||
@@ -188,7 +188,7 @@ impl ComputeState {
|
||||
|
||||
COMPUTE_CTL_UP.reset();
|
||||
COMPUTE_CTL_UP
|
||||
.with_label_values(&[&BUILD_TAG, format!("{}", status).as_str()])
|
||||
.with_label_values(&[&BUILD_TAG, status.to_string().as_str()])
|
||||
.set(1);
|
||||
}
|
||||
|
||||
@@ -360,6 +360,14 @@ impl ComputeNode {
|
||||
this.prewarm_postgres()?;
|
||||
}
|
||||
|
||||
// Set the up metric with Empty status before starting the HTTP server.
|
||||
// That way on the first metric scrape, an external observer will see us
|
||||
// as 'up' and 'empty' (unless the compute was started with a spec or
|
||||
// already configured by control plane).
|
||||
COMPUTE_CTL_UP
|
||||
.with_label_values(&[&BUILD_TAG, ComputeStatus::Empty.to_string().as_str()])
|
||||
.set(1);
|
||||
|
||||
// Launch the external HTTP server first, so that we can serve control plane
|
||||
// requests while configuration is still in progress.
|
||||
crate::http::server::Server::External {
|
||||
@@ -369,19 +377,13 @@ impl ComputeNode {
|
||||
}
|
||||
.launch(&this);
|
||||
|
||||
// The internal HTTP server is needed for a further activation by control plane
|
||||
// if compute was started for a pool, so we have to start server before hanging
|
||||
// waiting for a spec.
|
||||
// The internal HTTP server could be launched later, but there isn't much
|
||||
// sense in waiting.
|
||||
crate::http::server::Server::Internal {
|
||||
port: this.params.internal_http_port,
|
||||
}
|
||||
.launch(&this);
|
||||
|
||||
// HTTP server is running, so we can officially declare compute_ctl as 'up'
|
||||
COMPUTE_CTL_UP
|
||||
.with_label_values(&[&BUILD_TAG, ComputeStatus::Empty.to_string().as_str()])
|
||||
.set(1);
|
||||
|
||||
// If we got a spec from the CLI already, use that. Otherwise wait for the
|
||||
// control plane to pass it to us with a /configure HTTP request
|
||||
let pspec = if let Some(cli_spec) = cli_spec {
|
||||
|
||||
@@ -59,9 +59,12 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
|
||||
Box::pin(async move {
|
||||
let request_id = request.extract_parts::<RequestId>().await.unwrap();
|
||||
|
||||
// TODO: Remove this check after a successful rollout
|
||||
if jwks.keys.is_empty() {
|
||||
warn!(%request_id, "Authorization has not been configured");
|
||||
// TODO: Remove this stanza after teaching neon_local and the
|
||||
// regression tests to use a JWT + JWKS.
|
||||
//
|
||||
// https://github.com/neondatabase/neon/issues/11316
|
||||
if cfg!(feature = "testing") {
|
||||
warn!(%request_id, "Skipping compute_ctl authorization check");
|
||||
|
||||
return Ok(request);
|
||||
}
|
||||
@@ -110,8 +113,6 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
|
||||
impl Authorize {
|
||||
/// Verify the token using the JSON Web Key set and return the token data.
|
||||
fn verify(jwks: &JwkSet, token: &str, validation: &Validation) -> Result<TokenData<Claims>> {
|
||||
debug_assert!(!jwks.keys.is_empty());
|
||||
|
||||
for jwk in jwks.keys.iter() {
|
||||
let decoding_key = match DecodingKey::from_jwk(jwk) {
|
||||
Ok(key) => key,
|
||||
|
||||
@@ -419,7 +419,7 @@ impl ComputeNode {
|
||||
.iter()
|
||||
.filter_map(|val| val.parse::<usize>().ok())
|
||||
.map(|val| if val > 1 { val - 1 } else { 1 })
|
||||
.last()
|
||||
.next_back()
|
||||
.unwrap_or(3)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -545,6 +545,11 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_compaction_ratio_percent' as integer")?,
|
||||
sampling_ratio: settings
|
||||
.remove("sampling_ratio")
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("Falied to parse 'sampling_ratio'")?,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
|
||||
@@ -385,8 +385,6 @@ where
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let cli = Cli::parse();
|
||||
|
||||
let storcon_client = Client::new(cli.api.clone(), cli.jwt.clone());
|
||||
|
||||
let ssl_ca_certs = match &cli.ssl_ca_file {
|
||||
Some(ssl_ca_file) => {
|
||||
let buf = tokio::fs::read(ssl_ca_file).await?;
|
||||
@@ -401,9 +399,11 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
let http_client = http_client.build()?;
|
||||
|
||||
let storcon_client = Client::new(http_client.clone(), cli.api.clone(), cli.jwt.clone());
|
||||
|
||||
let mut trimmed = cli.api.to_string();
|
||||
trimmed.pop();
|
||||
let vps_client = mgmt_api::Client::new(http_client, trimmed, cli.jwt.as_deref());
|
||||
let vps_client = mgmt_api::Client::new(http_client.clone(), trimmed, cli.jwt.as_deref());
|
||||
|
||||
match cli.command {
|
||||
Command::NodeRegister {
|
||||
@@ -1056,7 +1056,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
const DEFAULT_MIGRATE_CONCURRENCY: usize = 8;
|
||||
let mut stream = futures::stream::iter(moves)
|
||||
.map(|mv| {
|
||||
let client = Client::new(cli.api.clone(), cli.jwt.clone());
|
||||
let client = Client::new(http_client.clone(), cli.api.clone(), cli.jwt.clone());
|
||||
async move {
|
||||
client
|
||||
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
|
||||
|
||||
@@ -21,6 +21,7 @@ in this repository.
|
||||
- [WAL Redo](./pageserver-walredo.md)
|
||||
- [Page cache](./pageserver-pagecache.md)
|
||||
- [Storage](./pageserver-storage.md)
|
||||
- [Compaction](./pageserver-compaction.md)
|
||||
- [Processing a GetPage request](./pageserver-processing-getpage.md)
|
||||
- [Processing WAL](./pageserver-processing-wal.md)
|
||||
|
||||
|
||||
110
docs/pageserver-compaction.md
Normal file
110
docs/pageserver-compaction.md
Normal file
@@ -0,0 +1,110 @@
|
||||
# Pageserver Compaction
|
||||
|
||||
Lifted from <https://www.notion.so/neondatabase/Rough-Notes-on-Compaction-1baf189e004780859e65ef63b85cfa81?pvs=4>.
|
||||
|
||||
Updated 2025-03-26.
|
||||
|
||||
## Pages and WAL
|
||||
|
||||
Postgres stores data in 8 KB pages, identified by a page number.
|
||||
|
||||
The WAL contains a sequence of page writes: either images (complete page contents) or deltas (patches applied to images). Each write is identified by its byte position in the WAL, aka LSN.
|
||||
|
||||
Each page version is thus identified by page@LSN. Postgres may read pages at past LSNs.
|
||||
|
||||
Pageservers ingest WAL by writing WAL records into a key/value store keyed by page@LSN.
|
||||
|
||||
Pageservers materialize pages for Postgres reads by finding the most recent page image and applying all subsequent page deltas, up to the read LSN.
|
||||
|
||||
## Compaction: Why?
|
||||
|
||||
Pageservers store page@LSN keys in a key/value store using a custom variant of an LSM tree. Each timeline on each tenant shard has its own LSM tree.
|
||||
|
||||
When Pageservers write new page@LSN entries, they are appended unordered to an ephemeral layer file. When the ephemeral layer file exceeds `checkpoint_distance` (default 256 MB), the key/value pairs are sorted by key and written out to a layer file (for efficient lookups).
|
||||
|
||||
As WAL writes continue, more layer files accumulate.
|
||||
|
||||
Reads must search through the layer files to find the page’s image and deltas. The more layer files accumulate, the more la yer files reads must search through before they find a page image, aka read amplification.
|
||||
|
||||
Compaction’s job is to:
|
||||
|
||||
- Reduce read amplification by reorganizing and combining layer files.
|
||||
- Remove old garbage from layer files.
|
||||
|
||||
As part of this, it may combine several page deltas into a single page image where possible.
|
||||
|
||||
## Compaction: How?
|
||||
|
||||
Neon uses a non-standard variant of an LSM tree made up of two levels of layer files: L0 and L1.
|
||||
|
||||
Compaction runs in two phases: L0→L1 compaction, and L1 image compaction.
|
||||
|
||||
L0 contains a stack of L0 layers at decreasing LSN ranges. These have been flushed sequentially from ephemeral layers. Each L0 layer covers the entire page space (page 0 to ~infinity) and the LSN range that was ingested into it. L0 layers are therefore particularly bad for read amp, since every read must search all L0 layers below the read LSN. For example:
|
||||
|
||||
```
|
||||
| Page 0-99 @ LSN 0400-04ff |
|
||||
| Page 0-99 @ LSN 0300-03ff |
|
||||
| Page 0-99 @ LSN 0200-02ff |
|
||||
| Page 0-99 @ LSN 0100-01ff |
|
||||
| Page 0-99 @ LSN 0000-00ff |
|
||||
```
|
||||
|
||||
L0→L1 compaction takes the bottom-most chunk of L0 layer files of between `compaction_threshold` (default 10) and `compaction_upper_limit` (default 20) layers. It uses merge-sort to write out sorted L1 delta layers of size `compaction_target_size` (default 128 MB).
|
||||
|
||||
L1 typically consists of a “bed” of image layers with materialized page images at a specific LSN, and then delta layers of various page/LSN ranges above them with page deltas. For example:
|
||||
|
||||
```
|
||||
Delta layers: | 30-84@0310-04ff |
|
||||
Delta layers: | 10-42@0200-02ff | | 65-92@0174-02aa |
|
||||
Image layers: | 0-39@0100 | 40-79@0100 | 80-99@0100 |
|
||||
```
|
||||
|
||||
L1 image compaction scans across the L1 keyspace at some LSN, materializes page images by reading the image and delta layers below the LSN (via vectored reads), and writes out new sorted image layers of roughly size `compaction_target_size` (default 128 MB) at that LSN.
|
||||
|
||||
Layer files below the new image files’ LSN can be garbage collected when they are no longer needed for PITR.
|
||||
|
||||
Even though the old layer files are not immediately garbage collected, the new image layers help with read amp because reads can stop traversing the layer stack as soon as they encounter a page image.
|
||||
|
||||
## Compaction: When?
|
||||
|
||||
Pageservers run a `compaction_loop` background task for each tenant shard. Every `compaction_period` (default 20 seconds) it will wake up and check if any of the shard’s timelines need compaction. Additionally, L0 layer flushes will eagerly wake the compaction loop if the L0 count exceeds `compaction_threshold` (default 10).
|
||||
|
||||
L0 compaction runs if the number of L0 layers exceeds `compaction_threshold` (default 10).
|
||||
|
||||
L1 image compaction runs across sections of the L1 keyspace that have at least `image_creation_threshold` (default 3) delta layers overlapping image layers.
|
||||
|
||||
At most `CONCURRENT_BACKGROUND_TASKS` (default 3 / 4 * CPUs = 6) background tasks can run concurrently on a Pageserver, including compaction. Further compaction tasks must wait.
|
||||
|
||||
Because L0 layers cause the most read amp (they overlap the entire keyspace and only contain page deltas), they are aggressively compacted down:
|
||||
|
||||
- L0 is compacted down across all tenant timelines before L1 compaction is attempted (`compaction_l0_first`).
|
||||
- L0 compaction uses a separate concurrency limit of `CONCURRENT_L0_COMPACTION_TASKS` (default 3 / 4 * CPUs = 6) to avoid waiting for other tasks (`compaction_l0_semaphore`).
|
||||
- If L0 compaction is needed on any tenant timeline, L1 image compaction will yield to start an immediate L0 compaction run (except for compaction run via admin APIs).
|
||||
|
||||
## Backpressure
|
||||
|
||||
With sustained heavy write loads, new L0 layers may be flushed faster than they can be compacted down. This can cause an unbounded buildup of read amplification and compaction debt, which can take hours to resolve even after the writes stop.
|
||||
|
||||
To avoid this and allow compaction to keep up, layer flushes will slow writes down to apply backpressure on the workload:
|
||||
|
||||
- At `l0_flush_delay_threshold` (default 30) L0 layers, layer flushes are delayed by the flush duration, such that they take 2x as long.
|
||||
- At `l0_flush_stall_threshold` (default disabled) L0 layers, layer flushes stall entirely until the L0 count falls back below the threshold. This is currently disabled because we don’t trust L0 compaction to be responsive enough.
|
||||
|
||||
This backpressure is propagated to the compute by waiting for layer flushes when WAL ingestion rolls the ephemeral layer. The compute will significantly slow down WAL writes at:
|
||||
|
||||
- `max_replication_write_lag` (default 500 MB), when Pageserver WAL ingestion lags
|
||||
- `max_replication_flush_lag` (default 10 GB), when Pageserver L0 flushes lag
|
||||
|
||||
Combined, this means that the compute will backpressure when there are 30 L0 layers (30 * 256 MB = 7.7 GB) and the Pageserver WAL ingestion lags the compute by 500 MB, for a total of ~8 GB L0+ephemeral compaction debt on a single shard.
|
||||
|
||||
Since we only delay L0 flushes by 2x when backpressuring, and haven’t enabled stalls, it is still possible for read amp to increase unbounded if compaction is too slow (although we haven’t seen this in practice). But this is considered better than stalling flushes and causing unavailability for as long as it takes L0 compaction to react, since we don’t trust it to be fast enough — at the expense of continually increasing read latency and CPU usage for this tenant. We should either enable stalls when we have enough confidence in L0 compaction, or scale the flush delay by the number of L0 layers to apply increasing backpressure.
|
||||
|
||||
## Circuit Breaker
|
||||
|
||||
Compaction can fail, often repeatedly. This can happen e.g. due to data corruption, faulty hardware, S3 outages, etc.
|
||||
|
||||
If compaction fails, the compaction loop will naïvely try and fail again almost immediately. It may only fail after doing a significant amount of wasted work, while holding onto the background task semaphore.
|
||||
|
||||
To avoid repeatedly doing wasted work and starving out other compaction jobs, each tenant has a compaction circuit breaker. After 5 repeated compaction failures, the circuit breaker trips and disables compaction for the next 24 hours, before resetting the breaker and trying again. This disables compaction across all tenant timelines (faulty or not).
|
||||
|
||||
Disabling compaction for a long time is dangerous, since it can lead to unbounded read amp and compaction debt, and continuous workload backpressure. However, continually failing would not help either. Tripped circuit breakers trigger an alert and must be investigated promptly.
|
||||
@@ -30,20 +30,6 @@ static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
static SERVE_METRICS_COUNT_2: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"libmetrics_metric_handler_requests_2_total",
|
||||
"Number of metric requests made"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
static SERVE_METRICS_COUNT_3: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"libmetrics_metric_handler_requests_total_3",
|
||||
"Number of metric requests made"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static X_REQUEST_ID_HEADER_STR: &str = "x-request-id";
|
||||
|
||||
@@ -265,8 +251,6 @@ impl std::io::Write for ChannelWriter {
|
||||
|
||||
pub async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
SERVE_METRICS_COUNT.inc();
|
||||
SERVE_METRICS_COUNT_2.inc();
|
||||
SERVE_METRICS_COUNT_3.inc();
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
|
||||
@@ -91,14 +91,14 @@ impl Server {
|
||||
Ok(tls_stream) => tls_stream,
|
||||
Err(err) => {
|
||||
if !suppress_io_error(&err) {
|
||||
info!("Failed to accept TLS connection: {err:#}");
|
||||
info!(%remote_addr, "Failed to accept TLS connection: {err:#}");
|
||||
}
|
||||
return;
|
||||
}
|
||||
};
|
||||
if let Err(err) = Self::serve_connection(tls_stream, service, cancel).await {
|
||||
if !suppress_hyper_error(&err) {
|
||||
info!("Failed to serve HTTPS connection: {err:#}");
|
||||
info!(%remote_addr, "Failed to serve HTTPS connection: {err:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -106,7 +106,7 @@ impl Server {
|
||||
// Handle HTTP connection.
|
||||
if let Err(err) = Self::serve_connection(tcp_stream, service, cancel).await {
|
||||
if !suppress_hyper_error(&err) {
|
||||
info!("Failed to serve HTTP connection: {err:#}");
|
||||
info!(%remote_addr, "Failed to serve HTTP connection: {err:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ postgres_backend.workspace = true
|
||||
nix = {workspace = true, optional = true}
|
||||
reqwest.workspace = true
|
||||
rand.workspace = true
|
||||
tracing-utils.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
bincode.workspace = true
|
||||
|
||||
@@ -51,9 +51,54 @@ pub struct NodeMetadata {
|
||||
/// If there cannot be a static default value because we need to make runtime
|
||||
/// checks to determine the default, make it an `Option` (which defaults to None).
|
||||
/// The runtime check should be done in the consuming crate, i.e., `pageserver`.
|
||||
///
|
||||
/// Unknown fields are silently ignored during deserialization.
|
||||
/// The alternative, which we used in the past, was to set `deny_unknown_fields`,
|
||||
/// which fails deserialization, and hence pageserver startup, if there is an unknown field.
|
||||
/// The reason we don't do that anymore is that it complicates
|
||||
/// usage of config fields for feature flagging, which we commonly do for
|
||||
/// region-by-region rollouts.
|
||||
/// The complications mainly arise because the `pageserver.toml` contents on a
|
||||
/// prod server have a separate lifecycle from the pageserver binary.
|
||||
/// For instance, `pageserver.toml` contents today are defined in the internal
|
||||
/// infra repo, and thus introducing a new config field to pageserver and
|
||||
/// rolling it out to prod servers are separate commits in separate repos
|
||||
/// that can't be made or rolled back atomically.
|
||||
/// Rollbacks in particular pose a risk with deny_unknown_fields because
|
||||
/// the old pageserver binary may reject a new config field, resulting in
|
||||
/// an outage unless the person doing the pageserver rollback remembers
|
||||
/// to also revert the commit that added the config field in to the
|
||||
/// `pageserver.toml` templates in the internal infra repo.
|
||||
/// (A pre-deploy config check would eliminate this risk during rollbacks,
|
||||
/// cf [here](https://github.com/neondatabase/cloud/issues/24349).)
|
||||
/// In addition to this compatibility problem during emergency rollbacks,
|
||||
/// deny_unknown_fields adds further complications when decomissioning a feature
|
||||
/// flag: with deny_unknown_fields, we can't remove a flag from the [`ConfigToml`]
|
||||
/// until all prod servers' `pageserver.toml` files have been updated to a version
|
||||
/// that doesn't specify the flag. Otherwise new software would fail to start up.
|
||||
/// This adds the requirement for an intermediate step where the new config field
|
||||
/// is accepted but ignored, prolonging the decomissioning process by an entire
|
||||
/// release cycle.
|
||||
/// By contrast with unknown fields silently ignored, decomissioning a feature
|
||||
/// flag is a one-step process: we can skip the intermediate step and straight
|
||||
/// remove the field from the [`ConfigToml`]. We leave the field in the
|
||||
/// `pageserver.toml` files on prod servers until we reach certainty that we
|
||||
/// will not roll back to old software whose behavior was dependent on config.
|
||||
/// Then we can remove the field from the templates in the internal infra repo.
|
||||
/// This process is [documented internally](
|
||||
/// https://docs.neon.build/storage/pageserver_configuration.html).
|
||||
///
|
||||
/// Note that above relaxed compatbility for the config format does NOT APPLY
|
||||
/// TO THE STORAGE FORMAT. As general guidance, when introducing storage format
|
||||
/// changes, ensure that the potential rollback target version will be compatible
|
||||
/// with the new format. This must hold regardless of what flags are set in in the `pageserver.toml`:
|
||||
/// any format version that exists in an environment must be compatible with the software that runs there.
|
||||
/// Use a pageserver.toml flag only to gate whether software _writes_ the new format.
|
||||
/// For more compatibility considerations, refer to [internal docs](
|
||||
/// https://docs.neon.build/storage/compat.html?highlight=compat#format-versions--compatibility)
|
||||
#[serde_as]
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(default, deny_unknown_fields)]
|
||||
#[serde(default)]
|
||||
pub struct ConfigToml {
|
||||
// types mapped 1:1 into the runtime PageServerConfig type
|
||||
pub listen_pg_addr: String,
|
||||
@@ -134,10 +179,10 @@ pub struct ConfigToml {
|
||||
pub load_previous_heatmap: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub generate_unarchival_heatmap: Option<bool>,
|
||||
pub tracing: Option<Tracing>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct DiskUsageEvictionTaskConfig {
|
||||
pub max_usage_pct: utils::serde_percent::Percent,
|
||||
pub min_avail_bytes: u64,
|
||||
@@ -152,13 +197,11 @@ pub struct DiskUsageEvictionTaskConfig {
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case")]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub enum PageServicePipeliningConfig {
|
||||
Serial,
|
||||
Pipelined(PageServicePipeliningConfigPipelined),
|
||||
}
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct PageServicePipeliningConfigPipelined {
|
||||
/// Causes runtime errors if larger than max get_vectored batch size.
|
||||
pub max_batch_size: NonZeroUsize,
|
||||
@@ -174,7 +217,6 @@ pub enum PageServiceProtocolPipelinedExecutionStrategy {
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case")]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub enum GetVectoredConcurrentIo {
|
||||
/// The read path is fully sequential: layers are visited
|
||||
/// one after the other and IOs are issued and waited upon
|
||||
@@ -191,6 +233,54 @@ pub enum GetVectoredConcurrentIo {
|
||||
SidecarTask,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct Ratio {
|
||||
pub numerator: usize,
|
||||
pub denominator: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct OtelExporterConfig {
|
||||
pub endpoint: String,
|
||||
pub protocol: OtelExporterProtocol,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum OtelExporterProtocol {
|
||||
Grpc,
|
||||
HttpBinary,
|
||||
HttpJson,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct Tracing {
|
||||
pub sampling_ratio: Ratio,
|
||||
pub export_config: OtelExporterConfig,
|
||||
}
|
||||
|
||||
impl From<&OtelExporterConfig> for tracing_utils::ExportConfig {
|
||||
fn from(val: &OtelExporterConfig) -> Self {
|
||||
tracing_utils::ExportConfig {
|
||||
endpoint: Some(val.endpoint.clone()),
|
||||
protocol: val.protocol.into(),
|
||||
timeout: val.timeout,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<OtelExporterProtocol> for tracing_utils::Protocol {
|
||||
fn from(val: OtelExporterProtocol) -> Self {
|
||||
match val {
|
||||
OtelExporterProtocol::Grpc => tracing_utils::Protocol::Grpc,
|
||||
OtelExporterProtocol::HttpJson => tracing_utils::Protocol::HttpJson,
|
||||
OtelExporterProtocol::HttpBinary => tracing_utils::Protocol::HttpBinary,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub mod statvfs {
|
||||
pub mod mock {
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -245,7 +335,7 @@ pub struct MaxVectoredReadBytes(pub NonZeroUsize);
|
||||
|
||||
/// Tenant-level configuration values, used for various purposes.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(deny_unknown_fields, default)]
|
||||
#[serde(default)]
|
||||
pub struct TenantConfigToml {
|
||||
// Flush out an inmemory layer, if it's holding WAL older than this
|
||||
// This puts a backstop on how much WAL needs to be re-digested if the
|
||||
@@ -367,6 +457,9 @@ pub struct TenantConfigToml {
|
||||
/// The ratio that triggers the auto gc-compaction. If (the total size of layers between L2 LSN and gc-horizon) / (size below the L2 LSN)
|
||||
/// is above this ratio, gc-compaction will be triggered.
|
||||
pub gc_compaction_ratio_percent: u64,
|
||||
/// Tenant level performance sampling ratio override. Controls the ratio of get page requests
|
||||
/// that will get perf sampling for the tenant.
|
||||
pub sampling_ratio: Option<Ratio>,
|
||||
}
|
||||
|
||||
pub mod defaults {
|
||||
@@ -537,6 +630,7 @@ impl Default for ConfigToml {
|
||||
validate_wal_contiguity: None,
|
||||
load_previous_heatmap: None,
|
||||
generate_unarchival_heatmap: None,
|
||||
tracing: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -652,6 +746,7 @@ impl Default for TenantConfigToml {
|
||||
gc_compaction_enabled: DEFAULT_GC_COMPACTION_ENABLED,
|
||||
gc_compaction_initial_threshold_kb: DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB,
|
||||
gc_compaction_ratio_percent: DEFAULT_GC_COMPACTION_RATIO_PERCENT,
|
||||
sampling_ratio: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ use utils::lsn::Lsn;
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
use utils::{completion, serde_system_time};
|
||||
|
||||
use crate::config::Ratio;
|
||||
use crate::key::{CompactKey, Key};
|
||||
use crate::reltag::RelTag;
|
||||
use crate::shard::{ShardCount, ShardStripeSize, TenantShardId};
|
||||
@@ -568,6 +569,8 @@ pub struct TenantConfigPatch {
|
||||
pub gc_compaction_initial_threshold_kb: FieldPatch<u64>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub gc_compaction_ratio_percent: FieldPatch<u64>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub sampling_ratio: FieldPatch<Option<Ratio>>,
|
||||
}
|
||||
|
||||
/// Like [`crate::config::TenantConfigToml`], but preserves the information
|
||||
@@ -688,6 +691,9 @@ pub struct TenantConfig {
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub gc_compaction_ratio_percent: Option<u64>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub sampling_ratio: Option<Option<Ratio>>,
|
||||
}
|
||||
|
||||
impl TenantConfig {
|
||||
@@ -730,6 +736,7 @@ impl TenantConfig {
|
||||
mut gc_compaction_enabled,
|
||||
mut gc_compaction_initial_threshold_kb,
|
||||
mut gc_compaction_ratio_percent,
|
||||
mut sampling_ratio,
|
||||
} = self;
|
||||
|
||||
patch.checkpoint_distance.apply(&mut checkpoint_distance);
|
||||
@@ -824,6 +831,7 @@ impl TenantConfig {
|
||||
patch
|
||||
.gc_compaction_ratio_percent
|
||||
.apply(&mut gc_compaction_ratio_percent);
|
||||
patch.sampling_ratio.apply(&mut sampling_ratio);
|
||||
|
||||
Ok(Self {
|
||||
checkpoint_distance,
|
||||
@@ -860,6 +868,7 @@ impl TenantConfig {
|
||||
gc_compaction_enabled,
|
||||
gc_compaction_initial_threshold_kb,
|
||||
gc_compaction_ratio_percent,
|
||||
sampling_ratio,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -961,6 +970,7 @@ impl TenantConfig {
|
||||
gc_compaction_ratio_percent: self
|
||||
.gc_compaction_ratio_percent
|
||||
.unwrap_or(global_conf.gc_compaction_ratio_percent),
|
||||
sampling_ratio: self.sampling_ratio.unwrap_or(global_conf.sampling_ratio),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1094,7 +1104,7 @@ pub struct CompactionAlgorithmSettings {
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case")]
|
||||
pub enum L0FlushConfig {
|
||||
#[serde(rename_all = "snake_case")]
|
||||
Direct { max_concurrency: NonZeroUsize },
|
||||
@@ -1418,11 +1428,6 @@ pub struct TimelineInfo {
|
||||
pub last_record_lsn: Lsn,
|
||||
pub prev_record_lsn: Option<Lsn>,
|
||||
|
||||
/// Legacy field, retained for one version to enable old storage controller to
|
||||
/// decode (it was a mandatory field).
|
||||
#[serde(default, rename = "latest_gc_cutoff_lsn")]
|
||||
pub _unused: Lsn,
|
||||
|
||||
/// The LSN up to which GC has advanced: older data may still exist but it is not available for clients.
|
||||
/// This LSN is not suitable for deciding where to create branches etc: use [`TimelineInfo::min_readable_lsn`] instead,
|
||||
/// as it is easier to reason about.
|
||||
|
||||
@@ -558,7 +558,7 @@ async fn upload_large_enough_file(
|
||||
) -> usize {
|
||||
let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
|
||||
let body = bytes::Bytes::from(vec![0u8; 1024]);
|
||||
let contents = std::iter::once(header).chain(std::iter::repeat(body).take(128));
|
||||
let contents = std::iter::once(header).chain(std::iter::repeat_n(body, 128));
|
||||
|
||||
let len = contents.clone().fold(0, |acc, next| acc + next.len());
|
||||
|
||||
|
||||
@@ -71,6 +71,7 @@ pub struct PeerInfo {
|
||||
pub ts: Instant,
|
||||
pub pg_connstr: String,
|
||||
pub http_connstr: String,
|
||||
pub https_connstr: Option<String>,
|
||||
}
|
||||
|
||||
pub type FullTransactionId = u64;
|
||||
@@ -227,6 +228,8 @@ pub struct TimelineDeleteResult {
|
||||
pub dir_existed: bool,
|
||||
}
|
||||
|
||||
pub type TenantDeleteResult = std::collections::HashMap<String, TimelineDeleteResult>;
|
||||
|
||||
fn lsn_invalid() -> Lsn {
|
||||
Lsn::INVALID
|
||||
}
|
||||
@@ -259,6 +262,8 @@ pub struct SkTimelineInfo {
|
||||
pub safekeeper_connstr: Option<String>,
|
||||
#[serde(default)]
|
||||
pub http_connstr: Option<String>,
|
||||
#[serde(default)]
|
||||
pub https_connstr: Option<String>,
|
||||
// Minimum of all active RO replicas flush LSN
|
||||
#[serde(default = "lsn_invalid")]
|
||||
pub standby_horizon: Lsn,
|
||||
|
||||
@@ -14,6 +14,7 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||
tracing.workspace = true
|
||||
tracing-opentelemetry.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tracing-subscriber.workspace = true # For examples in docs
|
||||
|
||||
@@ -31,10 +31,10 @@
|
||||
//! .init();
|
||||
//! }
|
||||
//! ```
|
||||
#![deny(unsafe_code)]
|
||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||
|
||||
pub mod http;
|
||||
pub mod perf_span;
|
||||
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry::trace::TracerProvider;
|
||||
|
||||
144
libs/tracing-utils/src/perf_span.rs
Normal file
144
libs/tracing-utils/src/perf_span.rs
Normal file
@@ -0,0 +1,144 @@
|
||||
//! Crutch module to work around tracing infrastructure deficiencies
|
||||
//!
|
||||
//! We wish to collect granular request spans without impacting performance
|
||||
//! by much. Ideally, we should have zero overhead for a sampling rate of 0.
|
||||
//!
|
||||
//! The approach taken by the pageserver crate is to use a completely different
|
||||
//! span hierarchy for the performance spans. Spans are explicitly stored in
|
||||
//! the request context and use a different [`tracing::Subscriber`] in order
|
||||
//! to avoid expensive filtering.
|
||||
//!
|
||||
//! [`tracing::Span`] instances record their [`tracing::Dispatch`] and, implcitly,
|
||||
//! their [`tracing::Subscriber`] at creation time. However, upon exiting the span,
|
||||
//! the global default [`tracing::Dispatch`] is used. This is problematic if one
|
||||
//! wishes to juggle different subscribers.
|
||||
//!
|
||||
//! In order to work around this, this module provides a [`PerfSpan`] type which
|
||||
//! wraps a [`Span`] and sets the default subscriber when exiting the span. This
|
||||
//! achieves the correct routing.
|
||||
//!
|
||||
//! There's also a modified version of [`tracing::Instrument`] which works with
|
||||
//! [`PerfSpan`].
|
||||
|
||||
use core::{
|
||||
future::Future,
|
||||
marker::Sized,
|
||||
mem::ManuallyDrop,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use pin_project_lite::pin_project;
|
||||
use tracing::{Dispatch, span::Span};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PerfSpan {
|
||||
inner: ManuallyDrop<Span>,
|
||||
dispatch: Dispatch,
|
||||
}
|
||||
|
||||
#[must_use = "once a span has been entered, it should be exited"]
|
||||
pub struct PerfSpanEntered<'a> {
|
||||
span: &'a PerfSpan,
|
||||
}
|
||||
|
||||
impl PerfSpan {
|
||||
pub fn new(span: Span, dispatch: Dispatch) -> Self {
|
||||
Self {
|
||||
inner: ManuallyDrop::new(span),
|
||||
dispatch,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn enter(&self) -> PerfSpanEntered {
|
||||
if let Some(ref id) = self.inner.id() {
|
||||
self.dispatch.enter(id);
|
||||
}
|
||||
|
||||
PerfSpanEntered { span: self }
|
||||
}
|
||||
|
||||
pub fn inner(&self) -> &Span {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PerfSpan {
|
||||
fn drop(&mut self) {
|
||||
// Bring the desired dispatch into scope before explicitly calling
|
||||
// the span destructor. This routes the span exit to the correct
|
||||
// [`tracing::Subscriber`].
|
||||
let _dispatch_guard = tracing::dispatcher::set_default(&self.dispatch);
|
||||
// SAFETY: ManuallyDrop in Drop implementation
|
||||
unsafe { ManuallyDrop::drop(&mut self.inner) }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PerfSpanEntered<'_> {
|
||||
fn drop(&mut self) {
|
||||
assert!(self.span.inner.id().is_some());
|
||||
|
||||
let _dispatch_guard = tracing::dispatcher::set_default(&self.span.dispatch);
|
||||
self.span.dispatch.exit(&self.span.inner.id().unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
pub trait PerfInstrument: Sized {
|
||||
fn instrument(self, span: PerfSpan) -> PerfInstrumented<Self> {
|
||||
PerfInstrumented {
|
||||
inner: ManuallyDrop::new(self),
|
||||
span,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
#[project = PerfInstrumentedProj]
|
||||
#[derive(Debug, Clone)]
|
||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||
pub struct PerfInstrumented<T> {
|
||||
// `ManuallyDrop` is used here to to enter instrument `Drop` by entering
|
||||
// `Span` and executing `ManuallyDrop::drop`.
|
||||
#[pin]
|
||||
inner: ManuallyDrop<T>,
|
||||
span: PerfSpan,
|
||||
}
|
||||
|
||||
impl<T> PinnedDrop for PerfInstrumented<T> {
|
||||
fn drop(this: Pin<&mut Self>) {
|
||||
let this = this.project();
|
||||
let _enter = this.span.enter();
|
||||
// SAFETY: 1. `Pin::get_unchecked_mut()` is safe, because this isn't
|
||||
// different from wrapping `T` in `Option` and calling
|
||||
// `Pin::set(&mut this.inner, None)`, except avoiding
|
||||
// additional memory overhead.
|
||||
// 2. `ManuallyDrop::drop()` is safe, because
|
||||
// `PinnedDrop::drop()` is guaranteed to be called only
|
||||
// once.
|
||||
unsafe { ManuallyDrop::drop(this.inner.get_unchecked_mut()) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> PerfInstrumentedProj<'a, T> {
|
||||
/// Get a mutable reference to the [`Span`] a pinned mutable reference to
|
||||
/// the wrapped type.
|
||||
fn span_and_inner_pin_mut(self) -> (&'a mut PerfSpan, Pin<&'a mut T>) {
|
||||
// SAFETY: As long as `ManuallyDrop<T>` does not move, `T` won't move
|
||||
// and `inner` is valid, because `ManuallyDrop::drop` is called
|
||||
// only inside `Drop` of the `Instrumented`.
|
||||
let inner = unsafe { self.inner.map_unchecked_mut(|v| &mut **v) };
|
||||
(self.span, inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Future> Future for PerfInstrumented<T> {
|
||||
type Output = T::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let (span, inner) = self.project().span_and_inner_pin_mut();
|
||||
let _enter = span.enter();
|
||||
inner.poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Sized> PerfInstrument for T {}
|
||||
26
libs/utils/src/elapsed_accum.rs
Normal file
26
libs/utils/src/elapsed_accum.rs
Normal file
@@ -0,0 +1,26 @@
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ElapsedAccum {
|
||||
accum: Duration,
|
||||
}
|
||||
|
||||
impl ElapsedAccum {
|
||||
pub fn get(&self) -> Duration {
|
||||
self.accum
|
||||
}
|
||||
pub fn guard(&mut self) -> impl Drop + '_ {
|
||||
let start = Instant::now();
|
||||
scopeguard::guard(start, |last_wait_at| {
|
||||
self.accum += Instant::now() - last_wait_at;
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn measure<Fut, O>(&mut self, fut: Fut) -> O
|
||||
where
|
||||
Fut: Future<Output = O>,
|
||||
{
|
||||
let _guard = self.guard();
|
||||
fut.await
|
||||
}
|
||||
}
|
||||
@@ -93,6 +93,8 @@ pub mod try_rcu;
|
||||
|
||||
pub mod guard_arc_swap;
|
||||
|
||||
pub mod elapsed_accum;
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
pub mod linux_socket_ioctl;
|
||||
|
||||
|
||||
@@ -111,9 +111,17 @@ impl<T> OnceCell<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Like [`Self::get_or_init_detached_measured`], but without out parameter for time spent waiting.
|
||||
pub async fn get_or_init_detached(&self) -> Result<Guard<'_, T>, InitPermit> {
|
||||
self.get_or_init_detached_measured(None).await
|
||||
}
|
||||
|
||||
/// Returns a guard to an existing initialized value, or returns an unique initialization
|
||||
/// permit which can be used to initialize this `OnceCell` using `OnceCell::set`.
|
||||
pub async fn get_or_init_detached(&self) -> Result<Guard<'_, T>, InitPermit> {
|
||||
pub async fn get_or_init_detached_measured(
|
||||
&self,
|
||||
mut wait_time: Option<&mut crate::elapsed_accum::ElapsedAccum>,
|
||||
) -> Result<Guard<'_, T>, InitPermit> {
|
||||
// It looks like OnceCell::get_or_init could be implemented using this method instead of
|
||||
// duplication. However, that makes the future be !Send due to possibly holding on to the
|
||||
// MutexGuard over an await point.
|
||||
@@ -125,12 +133,16 @@ impl<T> OnceCell<T> {
|
||||
}
|
||||
guard.init_semaphore.clone()
|
||||
};
|
||||
|
||||
{
|
||||
let permit = {
|
||||
// increment the count for the duration of queued
|
||||
let _guard = CountWaitingInitializers::start(self);
|
||||
sem.acquire().await
|
||||
let fut = sem.acquire();
|
||||
if let Some(wait_time) = wait_time.as_mut() {
|
||||
wait_time.measure(fut).await
|
||||
} else {
|
||||
fut.await
|
||||
}
|
||||
};
|
||||
|
||||
let Ok(permit) = permit else {
|
||||
|
||||
@@ -86,17 +86,17 @@ impl Client {
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
/// Get an arbitrary path and returning a streaming Response. This function is suitable
|
||||
/// for pass-through/proxy use cases where we don't care what the response content looks
|
||||
/// like.
|
||||
/// Send an HTTP request to an arbitrary path with a desired HTTP method and returning a streaming
|
||||
/// Response. This function is suitable for pass-through/proxy use cases where we don't care
|
||||
/// what the response content looks like.
|
||||
///
|
||||
/// Use/add one of the properly typed methods below if you know aren't proxying, and
|
||||
/// know what kind of response you expect.
|
||||
pub async fn get_raw(&self, path: String) -> Result<reqwest::Response> {
|
||||
pub async fn op_raw(&self, method: Method, path: String) -> Result<reqwest::Response> {
|
||||
debug_assert!(path.starts_with('/'));
|
||||
let uri = format!("{}{}", self.mgmt_api_endpoint, path);
|
||||
|
||||
let mut req = self.client.request(Method::GET, uri);
|
||||
let mut req = self.client.request(method, uri);
|
||||
if let Some(value) = &self.authorization_header {
|
||||
req = req.header(reqwest::header::AUTHORIZATION, value);
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@ use http_utils::tls_certs::ReloadingCertificateResolver;
|
||||
use metrics::launch_timestamp::{LaunchTimestamp, set_launch_timestamp_metric};
|
||||
use metrics::set_build_info_metric;
|
||||
use nix::sys::socket::{setsockopt, sockopt};
|
||||
use pageserver::config::{PageServerConf, PageserverIdentity};
|
||||
use pageserver::config::{PageServerConf, PageserverIdentity, ignored_fields};
|
||||
use pageserver::controller_upcall_client::StorageControllerUpcallClient;
|
||||
use pageserver::deletion_queue::DeletionQueue;
|
||||
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
|
||||
@@ -35,6 +35,7 @@ use tokio::signal::unix::SignalKind;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use tracing_utils::OtelGuard;
|
||||
use utils::auth::{JwtAuth, SwappableJwtAuth};
|
||||
use utils::crashsafe::syncfs;
|
||||
use utils::logging::TracingErrorLayerEnablement;
|
||||
@@ -97,7 +98,7 @@ fn main() -> anyhow::Result<()> {
|
||||
env::set_current_dir(&workdir)
|
||||
.with_context(|| format!("Failed to set application's current dir to '{workdir}'"))?;
|
||||
|
||||
let conf = initialize_config(&identity_file_path, &cfg_file_path, &workdir)?;
|
||||
let (conf, ignored) = initialize_config(&identity_file_path, &cfg_file_path, &workdir)?;
|
||||
|
||||
// Initialize logging.
|
||||
//
|
||||
@@ -118,6 +119,21 @@ fn main() -> anyhow::Result<()> {
|
||||
logging::Output::Stdout,
|
||||
)?;
|
||||
|
||||
let otel_enablement = match &conf.tracing {
|
||||
Some(cfg) => tracing_utils::OtelEnablement::Enabled {
|
||||
service_name: "pageserver".to_string(),
|
||||
export_config: (&cfg.export_config).into(),
|
||||
runtime: *COMPUTE_REQUEST_RUNTIME,
|
||||
},
|
||||
None => tracing_utils::OtelEnablement::Disabled,
|
||||
};
|
||||
|
||||
let otel_guard = tracing_utils::init_performance_tracing(otel_enablement);
|
||||
|
||||
if otel_guard.is_some() {
|
||||
info!(?conf.tracing, "starting with OTEL tracing enabled");
|
||||
}
|
||||
|
||||
// mind the order required here: 1. logging, 2. panic_hook, 3. sentry.
|
||||
// disarming this hook on pageserver, because we never tear down tracing.
|
||||
logging::replace_panic_hook_with_tracing_panic_hook().forget();
|
||||
@@ -128,7 +144,17 @@ fn main() -> anyhow::Result<()> {
|
||||
&[("node_id", &conf.id.to_string())],
|
||||
);
|
||||
|
||||
// after setting up logging, log the effective IO engine choice and read path implementations
|
||||
// Warn about ignored config items; see pageserver_api::config::ConfigToml
|
||||
// doc comment for rationale why we prefer this over serde(deny_unknown_fields).
|
||||
{
|
||||
let ignored_fields::Paths { paths } = &ignored;
|
||||
for path in paths {
|
||||
warn!(?path, "ignoring unknown configuration item");
|
||||
}
|
||||
}
|
||||
|
||||
// Log configuration items for feature-flag-like config
|
||||
// (maybe we should automate this with a visitor?).
|
||||
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
|
||||
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
|
||||
info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol");
|
||||
@@ -191,7 +217,7 @@ fn main() -> anyhow::Result<()> {
|
||||
tracing::info!("Initializing page_cache...");
|
||||
page_cache::init(conf.page_cache_size);
|
||||
|
||||
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
|
||||
start_pageserver(launch_ts, conf, ignored, otel_guard).context("Failed to start pageserver")?;
|
||||
|
||||
scenario.teardown();
|
||||
Ok(())
|
||||
@@ -201,7 +227,7 @@ fn initialize_config(
|
||||
identity_file_path: &Utf8Path,
|
||||
cfg_file_path: &Utf8Path,
|
||||
workdir: &Utf8Path,
|
||||
) -> anyhow::Result<&'static PageServerConf> {
|
||||
) -> anyhow::Result<(&'static PageServerConf, ignored_fields::Paths)> {
|
||||
// The deployment orchestrator writes out an indentity file containing the node id
|
||||
// for all pageservers. This file is the source of truth for the node id. In order
|
||||
// to allow for rolling back pageserver releases, the node id is also included in
|
||||
@@ -230,16 +256,36 @@ fn initialize_config(
|
||||
|
||||
let config_file_contents =
|
||||
std::fs::read_to_string(cfg_file_path).context("read config file from filesystem")?;
|
||||
let config_toml = serde_path_to_error::deserialize(
|
||||
toml_edit::de::Deserializer::from_str(&config_file_contents)
|
||||
.context("build toml deserializer")?,
|
||||
)
|
||||
.context("deserialize config toml")?;
|
||||
|
||||
// Deserialize the config file contents into a ConfigToml.
|
||||
let config_toml: pageserver_api::config::ConfigToml = {
|
||||
let deserializer = toml_edit::de::Deserializer::from_str(&config_file_contents)
|
||||
.context("build toml deserializer")?;
|
||||
let mut path_to_error_track = serde_path_to_error::Track::new();
|
||||
let deserializer =
|
||||
serde_path_to_error::Deserializer::new(deserializer, &mut path_to_error_track);
|
||||
serde::Deserialize::deserialize(deserializer).context("deserialize config toml")?
|
||||
};
|
||||
|
||||
// Find unknown fields by re-serializing the parsed ConfigToml and comparing it to the on-disk file.
|
||||
// Any fields that are only in the on-disk version are unknown.
|
||||
// (The assumption here is that the ConfigToml doesn't to skip_serializing_if.)
|
||||
// (Make sure to read the ConfigToml doc comment on why we only want to warn about, but not fail startup, on unknown fields).
|
||||
let ignored = {
|
||||
let ondisk_toml = config_file_contents
|
||||
.parse::<toml_edit::DocumentMut>()
|
||||
.context("parse original config as toml document")?;
|
||||
let parsed_toml = toml_edit::ser::to_document(&config_toml)
|
||||
.context("re-serialize config to toml document")?;
|
||||
pageserver::config::ignored_fields::find(ondisk_toml, parsed_toml)
|
||||
};
|
||||
|
||||
// Construct the runtime god object (it's called PageServerConf but actually is just global shared state).
|
||||
let conf = PageServerConf::parse_and_validate(identity.id, config_toml, workdir)
|
||||
.context("runtime-validation of config toml")?;
|
||||
let conf = Box::leak(Box::new(conf));
|
||||
|
||||
Ok(Box::leak(Box::new(conf)))
|
||||
Ok((conf, ignored))
|
||||
}
|
||||
|
||||
struct WaitForPhaseResult<F: std::future::Future + Unpin> {
|
||||
@@ -290,6 +336,8 @@ fn startup_checkpoint(started_at: Instant, phase: &str, human_phase: &str) {
|
||||
fn start_pageserver(
|
||||
launch_ts: &'static LaunchTimestamp,
|
||||
conf: &'static PageServerConf,
|
||||
ignored: ignored_fields::Paths,
|
||||
otel_guard: Option<OtelGuard>,
|
||||
) -> anyhow::Result<()> {
|
||||
// Monotonic time for later calculating startup duration
|
||||
let started_startup_at = Instant::now();
|
||||
@@ -312,7 +360,7 @@ fn start_pageserver(
|
||||
pageserver::metrics::tokio_epoll_uring::Collector::new(),
|
||||
))
|
||||
.unwrap();
|
||||
pageserver::preinitialize_metrics(conf);
|
||||
pageserver::preinitialize_metrics(conf, ignored);
|
||||
|
||||
// If any failpoints were set from FAILPOINTS environment variable,
|
||||
// print them to the log for debugging purposes
|
||||
@@ -675,13 +723,21 @@ fn start_pageserver(
|
||||
|
||||
// Spawn a task to listen for libpq connections. It will spawn further tasks
|
||||
// for each connection. We created the listener earlier already.
|
||||
let page_service = page_service::spawn(conf, tenant_manager.clone(), pg_auth, {
|
||||
let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it
|
||||
pageserver_listener
|
||||
.set_nonblocking(true)
|
||||
.context("set listener to nonblocking")?;
|
||||
tokio::net::TcpListener::from_std(pageserver_listener).context("create tokio listener")?
|
||||
});
|
||||
let perf_trace_dispatch = otel_guard.as_ref().map(|g| g.dispatch.clone());
|
||||
let page_service = page_service::spawn(
|
||||
conf,
|
||||
tenant_manager.clone(),
|
||||
pg_auth,
|
||||
perf_trace_dispatch,
|
||||
{
|
||||
let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it
|
||||
pageserver_listener
|
||||
.set_nonblocking(true)
|
||||
.context("set listener to nonblocking")?;
|
||||
tokio::net::TcpListener::from_std(pageserver_listener)
|
||||
.context("create tokio listener")?
|
||||
},
|
||||
);
|
||||
|
||||
// All started up! Now just sit and wait for shutdown signal.
|
||||
BACKGROUND_RUNTIME.block_on(async move {
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
//! file, or on the command line.
|
||||
//! See also `settings.md` for better description on every parameter.
|
||||
|
||||
pub mod ignored_fields;
|
||||
|
||||
use std::env;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
@@ -215,6 +217,8 @@ pub struct PageServerConf {
|
||||
|
||||
/// When set, include visible layers in the next uploaded heatmaps of an unarchived timeline.
|
||||
pub generate_unarchival_heatmap: bool,
|
||||
|
||||
pub tracing: Option<pageserver_api::config::Tracing>,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
@@ -386,6 +390,7 @@ impl PageServerConf {
|
||||
validate_wal_contiguity,
|
||||
load_previous_heatmap,
|
||||
generate_unarchival_heatmap,
|
||||
tracing,
|
||||
} = config_toml;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
@@ -435,6 +440,7 @@ impl PageServerConf {
|
||||
wal_receiver_protocol,
|
||||
page_service_pipelining,
|
||||
get_vectored_concurrent_io,
|
||||
tracing,
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// fields that require additional validation or custom handling
|
||||
@@ -506,6 +512,17 @@ impl PageServerConf {
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(tracing_config) = conf.tracing.as_ref() {
|
||||
let ratio = &tracing_config.sampling_ratio;
|
||||
ensure!(
|
||||
ratio.denominator != 0 && ratio.denominator >= ratio.numerator,
|
||||
format!(
|
||||
"Invalid sampling ratio: {}/{}",
|
||||
ratio.numerator, ratio.denominator
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
IndexEntry::validate_checkpoint_distance(conf.default_tenant_conf.checkpoint_distance)
|
||||
.map_err(anyhow::Error::msg)
|
||||
.with_context(|| {
|
||||
@@ -545,7 +562,6 @@ impl PageServerConf {
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize, serde::Serialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct PageserverIdentity {
|
||||
pub id: NodeId,
|
||||
}
|
||||
@@ -617,82 +633,4 @@ mod tests {
|
||||
PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir)
|
||||
.expect("parse_and_validate");
|
||||
}
|
||||
|
||||
/// If there's a typo in the pageserver config, we'd rather catch that typo
|
||||
/// and fail pageserver startup than silently ignoring the typo, leaving whoever
|
||||
/// made it in the believe that their config change is effective.
|
||||
///
|
||||
/// The default in serde is to allow unknown fields, so, we rely
|
||||
/// on developer+review discipline to add `deny_unknown_fields` when adding
|
||||
/// new structs to the config, and these tests here as a regression test.
|
||||
///
|
||||
/// The alternative to all of this would be to allow unknown fields in the config.
|
||||
/// To catch them, we could have a config check tool or mgmt API endpoint that
|
||||
/// compares the effective config with the TOML on disk and makes sure that
|
||||
/// the on-disk TOML is a strict subset of the effective config.
|
||||
mod unknown_fields_handling {
|
||||
macro_rules! test {
|
||||
($short_name:ident, $input:expr) => {
|
||||
#[test]
|
||||
fn $short_name() {
|
||||
let input = $input;
|
||||
let err = toml_edit::de::from_str::<pageserver_api::config::ConfigToml>(&input)
|
||||
.expect_err("some_invalid_field is an invalid field");
|
||||
dbg!(&err);
|
||||
assert!(err.to_string().contains("some_invalid_field"));
|
||||
}
|
||||
};
|
||||
}
|
||||
use indoc::indoc;
|
||||
|
||||
test!(
|
||||
toplevel,
|
||||
indoc! {r#"
|
||||
some_invalid_field = 23
|
||||
"#}
|
||||
);
|
||||
|
||||
test!(
|
||||
toplevel_nested,
|
||||
indoc! {r#"
|
||||
[some_invalid_field]
|
||||
foo = 23
|
||||
"#}
|
||||
);
|
||||
|
||||
test!(
|
||||
disk_usage_based_eviction,
|
||||
indoc! {r#"
|
||||
[disk_usage_based_eviction]
|
||||
some_invalid_field = 23
|
||||
"#}
|
||||
);
|
||||
|
||||
test!(
|
||||
tenant_config,
|
||||
indoc! {r#"
|
||||
[tenant_config]
|
||||
some_invalid_field = 23
|
||||
"#}
|
||||
);
|
||||
|
||||
test!(
|
||||
l0_flush,
|
||||
indoc! {r#"
|
||||
[l0_flush]
|
||||
mode = "direct"
|
||||
some_invalid_field = 23
|
||||
"#}
|
||||
);
|
||||
|
||||
// TODO: fix this => https://github.com/neondatabase/neon/issues/8915
|
||||
// test!(
|
||||
// remote_storage_config,
|
||||
// indoc! {r#"
|
||||
// [remote_storage_config]
|
||||
// local_path = "/nonexistent"
|
||||
// some_invalid_field = 23
|
||||
// "#}
|
||||
// );
|
||||
}
|
||||
}
|
||||
|
||||
179
pageserver/src/config/ignored_fields.rs
Normal file
179
pageserver/src/config/ignored_fields.rs
Normal file
@@ -0,0 +1,179 @@
|
||||
//! Check for fields in the on-disk config file that were ignored when
|
||||
//! deserializing [`pageserver_api::config::ConfigToml`].
|
||||
//!
|
||||
//! This could have been part of the [`pageserver_api::config`] module,
|
||||
//! but the way we identify unused fields in this module
|
||||
//! is specific to the format (TOML) and the implementation of the
|
||||
//! deserialization for that format ([`toml_edit`]).
|
||||
|
||||
use std::collections::HashSet;
|
||||
|
||||
use itertools::Itertools;
|
||||
|
||||
/// Pass in the user-specified config and the re-serialized [`pageserver_api::config::ConfigToml`].
|
||||
/// The returned [`Paths`] contains the paths to the fields that were ignored by deserialization
|
||||
/// of the [`pageserver_api::config::ConfigToml`].
|
||||
pub fn find(user_specified: toml_edit::DocumentMut, reserialized: toml_edit::DocumentMut) -> Paths {
|
||||
let user_specified = paths(user_specified);
|
||||
let reserialized = paths(reserialized);
|
||||
fn paths(doc: toml_edit::DocumentMut) -> HashSet<String> {
|
||||
let mut out = Vec::new();
|
||||
let mut visitor = PathsVisitor::new(&mut out);
|
||||
visitor.visit_table_like(doc.as_table());
|
||||
HashSet::from_iter(out)
|
||||
}
|
||||
|
||||
let mut ignored = HashSet::new();
|
||||
|
||||
// O(n) because of HashSet
|
||||
for path in user_specified {
|
||||
if !reserialized.contains(&path) {
|
||||
ignored.insert(path);
|
||||
}
|
||||
}
|
||||
|
||||
Paths {
|
||||
paths: ignored
|
||||
.into_iter()
|
||||
// sort lexicographically for deterministic output
|
||||
.sorted()
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Paths {
|
||||
pub paths: Vec<String>,
|
||||
}
|
||||
|
||||
struct PathsVisitor<'a> {
|
||||
stack: Vec<String>,
|
||||
out: &'a mut Vec<String>,
|
||||
}
|
||||
|
||||
impl<'a> PathsVisitor<'a> {
|
||||
fn new(out: &'a mut Vec<String>) -> Self {
|
||||
Self {
|
||||
stack: Vec::new(),
|
||||
out,
|
||||
}
|
||||
}
|
||||
|
||||
fn visit_table_like(&mut self, table_like: &dyn toml_edit::TableLike) {
|
||||
for (entry, item) in table_like.iter() {
|
||||
self.stack.push(entry.to_string());
|
||||
self.visit_item(item);
|
||||
self.stack.pop();
|
||||
}
|
||||
}
|
||||
|
||||
fn visit_item(&mut self, item: &toml_edit::Item) {
|
||||
match item {
|
||||
toml_edit::Item::None => (),
|
||||
toml_edit::Item::Value(value) => self.visit_value(value),
|
||||
toml_edit::Item::Table(table) => {
|
||||
self.visit_table_like(table);
|
||||
}
|
||||
toml_edit::Item::ArrayOfTables(array_of_tables) => {
|
||||
for (i, table) in array_of_tables.iter().enumerate() {
|
||||
self.stack.push(format!("[{i}]"));
|
||||
self.visit_table_like(table);
|
||||
self.stack.pop();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn visit_value(&mut self, value: &toml_edit::Value) {
|
||||
match value {
|
||||
toml_edit::Value::String(_)
|
||||
| toml_edit::Value::Integer(_)
|
||||
| toml_edit::Value::Float(_)
|
||||
| toml_edit::Value::Boolean(_)
|
||||
| toml_edit::Value::Datetime(_) => self.out.push(self.stack.join(".")),
|
||||
toml_edit::Value::Array(array) => {
|
||||
for (i, value) in array.iter().enumerate() {
|
||||
self.stack.push(format!("[{i}]"));
|
||||
self.visit_value(value);
|
||||
self.stack.pop();
|
||||
}
|
||||
}
|
||||
toml_edit::Value::InlineTable(inline_table) => self.visit_table_like(inline_table),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod tests {
|
||||
|
||||
fn test_impl(original: &str, parsed: &str, expect: [&str; 1]) {
|
||||
let original: toml_edit::DocumentMut = original.parse().expect("parse original config");
|
||||
let parsed: toml_edit::DocumentMut = parsed.parse().expect("parse re-serialized config");
|
||||
|
||||
let super::Paths { paths: actual } = super::find(original, parsed);
|
||||
assert_eq!(actual, &expect);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn top_level() {
|
||||
test_impl(
|
||||
r#"
|
||||
[a]
|
||||
b = 1
|
||||
c = 2
|
||||
d = 3
|
||||
"#,
|
||||
r#"
|
||||
[a]
|
||||
b = 1
|
||||
c = 2
|
||||
"#,
|
||||
["a.d"],
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn nested() {
|
||||
test_impl(
|
||||
r#"
|
||||
[a.b.c]
|
||||
d = 23
|
||||
"#,
|
||||
r#"
|
||||
[a]
|
||||
e = 42
|
||||
"#,
|
||||
["a.b.c.d"],
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn array_of_tables() {
|
||||
test_impl(
|
||||
r#"
|
||||
[[a]]
|
||||
b = 1
|
||||
c = 2
|
||||
d = 3
|
||||
"#,
|
||||
r#"
|
||||
[[a]]
|
||||
b = 1
|
||||
c = 2
|
||||
"#,
|
||||
["a.[0].d"],
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn array() {
|
||||
test_impl(
|
||||
r#"
|
||||
foo = [ {bar = 23} ]
|
||||
"#,
|
||||
r#"
|
||||
foo = [ { blup = 42 }]
|
||||
"#,
|
||||
["foo.[0].bar"],
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -89,7 +89,7 @@
|
||||
//! [`RequestContext`] argument. Functions in the middle of the call chain
|
||||
//! only need to pass it on.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use tracing::warn;
|
||||
@@ -100,6 +100,12 @@ use crate::{
|
||||
task_mgr::TaskKind,
|
||||
tenant::Timeline,
|
||||
};
|
||||
use futures::FutureExt;
|
||||
use futures::future::BoxFuture;
|
||||
use std::future::Future;
|
||||
use tracing_utils::perf_span::{PerfInstrument, PerfSpan};
|
||||
|
||||
use tracing::{Dispatch, Span};
|
||||
|
||||
// The main structure of this module, see module-level comment.
|
||||
pub struct RequestContext {
|
||||
@@ -109,6 +115,8 @@ pub struct RequestContext {
|
||||
page_content_kind: PageContentKind,
|
||||
read_path_debug: bool,
|
||||
scope: Scope,
|
||||
perf_span: Option<PerfSpan>,
|
||||
perf_span_dispatch: Option<Dispatch>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -263,22 +271,15 @@ impl RequestContextBuilder {
|
||||
page_content_kind: PageContentKind::Unknown,
|
||||
read_path_debug: false,
|
||||
scope: Scope::new_global(),
|
||||
perf_span: None,
|
||||
perf_span_dispatch: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn extend(original: &RequestContext) -> Self {
|
||||
pub fn from(original: &RequestContext) -> Self {
|
||||
Self {
|
||||
// This is like a Copy, but avoid implementing Copy because ordinary users of
|
||||
// RequestContext should always move or ref it.
|
||||
inner: RequestContext {
|
||||
task_kind: original.task_kind,
|
||||
download_behavior: original.download_behavior,
|
||||
access_stats_behavior: original.access_stats_behavior,
|
||||
page_content_kind: original.page_content_kind,
|
||||
read_path_debug: original.read_path_debug,
|
||||
scope: original.scope.clone(),
|
||||
},
|
||||
inner: original.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -316,12 +317,74 @@ impl RequestContextBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(self) -> RequestContext {
|
||||
pub(crate) fn perf_span_dispatch(mut self, dispatch: Option<Dispatch>) -> Self {
|
||||
self.inner.perf_span_dispatch = dispatch;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn root_perf_span<Fn>(mut self, make_span: Fn) -> Self
|
||||
where
|
||||
Fn: FnOnce() -> Span,
|
||||
{
|
||||
assert!(self.inner.perf_span.is_none());
|
||||
assert!(self.inner.perf_span_dispatch.is_some());
|
||||
|
||||
let dispatcher = self.inner.perf_span_dispatch.as_ref().unwrap();
|
||||
let new_span = tracing::dispatcher::with_default(dispatcher, make_span);
|
||||
|
||||
self.inner.perf_span = Some(PerfSpan::new(new_span, dispatcher.clone()));
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub fn perf_span<Fn>(mut self, make_span: Fn) -> Self
|
||||
where
|
||||
Fn: FnOnce(&Span) -> Span,
|
||||
{
|
||||
if let Some(ref perf_span) = self.inner.perf_span {
|
||||
assert!(self.inner.perf_span_dispatch.is_some());
|
||||
let dispatcher = self.inner.perf_span_dispatch.as_ref().unwrap();
|
||||
|
||||
let new_span =
|
||||
tracing::dispatcher::with_default(dispatcher, || make_span(perf_span.inner()));
|
||||
|
||||
self.inner.perf_span = Some(PerfSpan::new(new_span, dispatcher.clone()));
|
||||
}
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub fn root(self) -> RequestContext {
|
||||
self.inner
|
||||
}
|
||||
|
||||
pub fn attached_child(self) -> RequestContext {
|
||||
self.inner
|
||||
}
|
||||
|
||||
pub fn detached_child(self) -> RequestContext {
|
||||
self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl RequestContext {
|
||||
/// Private clone implementation
|
||||
///
|
||||
/// Callers should use the [`RequestContextBuilder`] or child spaning APIs of
|
||||
/// [`RequestContext`].
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
task_kind: self.task_kind,
|
||||
download_behavior: self.download_behavior,
|
||||
access_stats_behavior: self.access_stats_behavior,
|
||||
page_content_kind: self.page_content_kind,
|
||||
read_path_debug: self.read_path_debug,
|
||||
scope: self.scope.clone(),
|
||||
perf_span: self.perf_span.clone(),
|
||||
perf_span_dispatch: self.perf_span_dispatch.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new RequestContext that has no parent.
|
||||
///
|
||||
/// The function is called `new` because, once we add children
|
||||
@@ -337,7 +400,7 @@ impl RequestContext {
|
||||
pub fn new(task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
|
||||
RequestContextBuilder::new(task_kind)
|
||||
.download_behavior(download_behavior)
|
||||
.build()
|
||||
.root()
|
||||
}
|
||||
|
||||
/// Create a detached child context for a task that may outlive `self`.
|
||||
@@ -358,7 +421,10 @@ impl RequestContext {
|
||||
///
|
||||
/// We could make new calls to this function fail if `self` is already canceled.
|
||||
pub fn detached_child(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
|
||||
self.child_impl(task_kind, download_behavior)
|
||||
RequestContextBuilder::from(self)
|
||||
.task_kind(task_kind)
|
||||
.download_behavior(download_behavior)
|
||||
.detached_child()
|
||||
}
|
||||
|
||||
/// Create a child of context `self` for a task that shall not outlive `self`.
|
||||
@@ -382,7 +448,7 @@ impl RequestContext {
|
||||
/// The method to wait for child tasks would return an error, indicating
|
||||
/// that the child task was not started because the context was canceled.
|
||||
pub fn attached_child(&self) -> Self {
|
||||
self.child_impl(self.task_kind(), self.download_behavior())
|
||||
RequestContextBuilder::from(self).attached_child()
|
||||
}
|
||||
|
||||
/// Use this function when you should be creating a child context using
|
||||
@@ -397,17 +463,10 @@ impl RequestContext {
|
||||
Self::new(task_kind, download_behavior)
|
||||
}
|
||||
|
||||
fn child_impl(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
|
||||
RequestContextBuilder::extend(self)
|
||||
.task_kind(task_kind)
|
||||
.download_behavior(download_behavior)
|
||||
.build()
|
||||
}
|
||||
|
||||
pub fn with_scope_timeline(&self, timeline: &Arc<Timeline>) -> Self {
|
||||
RequestContextBuilder::extend(self)
|
||||
RequestContextBuilder::from(self)
|
||||
.scope(Scope::new_timeline(timeline))
|
||||
.build()
|
||||
.attached_child()
|
||||
}
|
||||
|
||||
pub(crate) fn with_scope_page_service_pagestream(
|
||||
@@ -416,9 +475,9 @@ impl RequestContext {
|
||||
crate::page_service::TenantManagerTypes,
|
||||
>,
|
||||
) -> Self {
|
||||
RequestContextBuilder::extend(self)
|
||||
RequestContextBuilder::from(self)
|
||||
.scope(Scope::new_page_service_pagestream(timeline_handle))
|
||||
.build()
|
||||
.attached_child()
|
||||
}
|
||||
|
||||
pub fn with_scope_secondary_timeline(
|
||||
@@ -426,28 +485,30 @@ impl RequestContext {
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
) -> Self {
|
||||
RequestContextBuilder::extend(self)
|
||||
RequestContextBuilder::from(self)
|
||||
.scope(Scope::new_secondary_timeline(tenant_shard_id, timeline_id))
|
||||
.build()
|
||||
.attached_child()
|
||||
}
|
||||
|
||||
pub fn with_scope_secondary_tenant(&self, tenant_shard_id: &TenantShardId) -> Self {
|
||||
RequestContextBuilder::extend(self)
|
||||
RequestContextBuilder::from(self)
|
||||
.scope(Scope::new_secondary_tenant(tenant_shard_id))
|
||||
.build()
|
||||
.attached_child()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn with_scope_unit_test(&self) -> Self {
|
||||
RequestContextBuilder::new(TaskKind::UnitTest)
|
||||
RequestContextBuilder::from(self)
|
||||
.task_kind(TaskKind::UnitTest)
|
||||
.scope(Scope::new_unit_test())
|
||||
.build()
|
||||
.attached_child()
|
||||
}
|
||||
|
||||
pub fn with_scope_debug_tools(&self) -> Self {
|
||||
RequestContextBuilder::new(TaskKind::DebugTool)
|
||||
RequestContextBuilder::from(self)
|
||||
.task_kind(TaskKind::DebugTool)
|
||||
.scope(Scope::new_debug_tools())
|
||||
.build()
|
||||
.attached_child()
|
||||
}
|
||||
|
||||
pub fn task_kind(&self) -> TaskKind {
|
||||
@@ -504,4 +565,76 @@ impl RequestContext {
|
||||
Scope::DebugTools { io_size_metrics } => io_size_metrics,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn ondemand_download_wait_observe(&self, duration: Duration) {
|
||||
if duration == Duration::ZERO {
|
||||
return;
|
||||
}
|
||||
|
||||
match &self.scope {
|
||||
Scope::Timeline { arc_arc } => arc_arc
|
||||
.wait_ondemand_download_time
|
||||
.observe(self.task_kind, duration),
|
||||
_ => {
|
||||
use once_cell::sync::Lazy;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use utils::rate_limit::RateLimit;
|
||||
static LIMIT: Lazy<Mutex<RateLimit>> =
|
||||
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1))));
|
||||
let mut guard = LIMIT.lock().unwrap();
|
||||
guard.call2(|rate_limit_stats| {
|
||||
warn!(
|
||||
%rate_limit_stats,
|
||||
backtrace=%std::backtrace::Backtrace::force_capture(),
|
||||
"ondemand downloads should always happen within timeline scope",
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn perf_follows_from(&self, from: &RequestContext) {
|
||||
if let (Some(span), Some(from_span)) = (&self.perf_span, &from.perf_span) {
|
||||
span.inner().follows_from(from_span.inner());
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn has_perf_span(&self) -> bool {
|
||||
self.perf_span.is_some()
|
||||
}
|
||||
}
|
||||
|
||||
/// [`Future`] extension trait that allow for creating performance
|
||||
/// spans on sampled requests
|
||||
pub(crate) trait PerfInstrumentFutureExt<'a>: Future + Send {
|
||||
/// Instrument this future with a new performance span when the
|
||||
/// provided request context indicates the originator request
|
||||
/// was sampled. Otherwise, just box the future and return it as is.
|
||||
fn maybe_perf_instrument<Fn>(
|
||||
self,
|
||||
ctx: &RequestContext,
|
||||
make_span: Fn,
|
||||
) -> BoxFuture<'a, Self::Output>
|
||||
where
|
||||
Self: Sized + 'a,
|
||||
Fn: FnOnce(&Span) -> Span,
|
||||
{
|
||||
match &ctx.perf_span {
|
||||
Some(perf_span) => {
|
||||
assert!(ctx.perf_span_dispatch.is_some());
|
||||
let dispatcher = ctx.perf_span_dispatch.as_ref().unwrap();
|
||||
|
||||
let new_span =
|
||||
tracing::dispatcher::with_default(dispatcher, || make_span(perf_span.inner()));
|
||||
|
||||
let new_perf_span = PerfSpan::new(new_span, dispatcher.clone());
|
||||
self.instrument(new_perf_span).boxed()
|
||||
}
|
||||
None => self.boxed(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Implement the trait for all types that satisfy the trait bounds
|
||||
impl<'a, T: Future + Send + 'a> PerfInstrumentFutureExt<'a> for T {}
|
||||
|
||||
@@ -74,8 +74,8 @@ use crate::tenant::size::ModelInputs;
|
||||
use crate::tenant::storage_layer::{IoConcurrency, LayerAccessStatsReset, LayerName};
|
||||
use crate::tenant::timeline::offload::{OffloadError, offload_timeline};
|
||||
use crate::tenant::timeline::{
|
||||
CompactFlags, CompactOptions, CompactRequest, CompactionError, Timeline, WaitLsnTimeout,
|
||||
WaitLsnWaiter, import_pgdata,
|
||||
CompactFlags, CompactOptions, CompactRequest, CompactionError, MarkInvisibleRequest, Timeline,
|
||||
WaitLsnTimeout, WaitLsnWaiter, import_pgdata,
|
||||
};
|
||||
use crate::tenant::{
|
||||
GetTimelineError, LogicalSizeCalculationCause, OffloadedTimeline, PageReconstructError,
|
||||
@@ -445,6 +445,9 @@ async fn build_timeline_info_common(
|
||||
|
||||
let (pitr_history_size, within_ancestor_pitr) = timeline.get_pitr_history_stats();
|
||||
|
||||
// Externally, expose the lowest LSN that can be used to create a branch.
|
||||
// Internally we distinguish between the planned GC cutoff (PITR point) and the "applied" GC cutoff (where we
|
||||
// actually trimmed data to), which can pass each other when PITR is changed.
|
||||
let min_readable_lsn = std::cmp::max(
|
||||
timeline.get_gc_cutoff_lsn(),
|
||||
*timeline.get_applied_gc_cutoff_lsn(),
|
||||
@@ -461,7 +464,6 @@ async fn build_timeline_info_common(
|
||||
initdb_lsn,
|
||||
last_record_lsn,
|
||||
prev_record_lsn: Some(timeline.get_prev_record_lsn()),
|
||||
_unused: Default::default(), // Unused, for legacy decode only
|
||||
min_readable_lsn,
|
||||
applied_gc_cutoff_lsn: *timeline.get_applied_gc_cutoff_lsn(),
|
||||
current_logical_size: current_logical_size.size_dont_care_about_accuracy(),
|
||||
@@ -2335,21 +2337,31 @@ async fn timeline_compact_handler(
|
||||
}
|
||||
|
||||
async fn timeline_mark_invisible_handler(
|
||||
request: Request<Body>,
|
||||
mut request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let compact_request = json_request_maybe::<Option<MarkInvisibleRequest>>(&mut request).await?;
|
||||
|
||||
let state = get_state(&request);
|
||||
|
||||
let visibility = match compact_request {
|
||||
Some(req) => match req.is_visible {
|
||||
Some(true) => TimelineVisibilityState::Visible,
|
||||
Some(false) | None => TimelineVisibilityState::Invisible,
|
||||
},
|
||||
None => TimelineVisibilityState::Invisible,
|
||||
};
|
||||
|
||||
async {
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
let timeline = tenant.get_timeline(timeline_id, true)?;
|
||||
timeline.remote_client.schedule_index_upload_for_timeline_invisible_state(TimelineVisibilityState::Invisible).map_err(ApiError::InternalServerError)?;
|
||||
timeline.remote_client.schedule_index_upload_for_timeline_invisible_state(visibility).map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
.instrument(info_span!("manual_timeline_mark_invisible", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
|
||||
@@ -2685,11 +2697,12 @@ async fn getpage_at_lsn_handler_inner(
|
||||
let lsn: Option<Lsn> = parse_query_param(&request, "lsn")?;
|
||||
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
// Enable read path debugging
|
||||
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
|
||||
let ctx = RequestContextBuilder::extend(&ctx).read_path_debug(true)
|
||||
.scope(context::Scope::new_timeline(&timeline)).build();
|
||||
let ctx = RequestContextBuilder::new(TaskKind::MgmtRequest)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.scope(context::Scope::new_timeline(&timeline))
|
||||
.read_path_debug(true)
|
||||
.root();
|
||||
|
||||
// Use last_record_lsn if no lsn is provided
|
||||
let lsn = lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
|
||||
@@ -3176,7 +3189,8 @@ async fn list_aux_files(
|
||||
timeline.gate.enter().map_err(|_| ApiError::Cancelled)?,
|
||||
);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download)
|
||||
.with_scope_timeline(&timeline);
|
||||
let files = timeline
|
||||
.list_aux_files(body.lsn, &ctx, io_concurrency)
|
||||
.await?;
|
||||
@@ -3420,14 +3434,15 @@ async fn put_tenant_timeline_import_wal(
|
||||
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
|
||||
let span = info_span!("import_wal", tenant_id=%tenant_id, timeline_id=%timeline_id, start_lsn=%start_lsn, end_lsn=%end_lsn);
|
||||
async move {
|
||||
let state = get_state(&request);
|
||||
|
||||
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, TenantShardId::unsharded(tenant_id), timeline_id).await?;
|
||||
let ctx = RequestContextBuilder::extend(&ctx).scope(context::Scope::new_timeline(&timeline)).build();
|
||||
let ctx = RequestContextBuilder::new(TaskKind::MgmtRequest)
|
||||
.download_behavior(DownloadBehavior::Warn)
|
||||
.scope(context::Scope::new_timeline(&timeline))
|
||||
.root();
|
||||
|
||||
let mut body = StreamReader::new(request.into_body().map(|res| {
|
||||
res.map_err(|error| {
|
||||
|
||||
@@ -55,6 +55,9 @@ pub const DEFAULT_PG_VERSION: u32 = 16;
|
||||
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
|
||||
pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
|
||||
|
||||
// Target used for performance traces.
|
||||
pub const PERF_TRACE_TARGET: &str = "P";
|
||||
|
||||
static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
|
||||
|
||||
pub use crate::metrics::preinitialize_metrics;
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::os::fd::RawFd;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use enum_map::{Enum as _, EnumMap};
|
||||
@@ -23,13 +21,13 @@ use pageserver_api::config::{
|
||||
};
|
||||
use pageserver_api::models::InMemoryLayerInfo;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pin_project_lite::pin_project;
|
||||
use postgres_backend::{QueryError, is_expected_io_error};
|
||||
use pq_proto::framed::ConnectionError;
|
||||
use strum::{EnumCount, IntoEnumIterator as _, VariantNames};
|
||||
use strum_macros::{IntoStaticStr, VariantNames};
|
||||
use utils::id::TimelineId;
|
||||
|
||||
use crate::config;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{PageContentKind, RequestContext};
|
||||
use crate::pgdatadir_mapping::DatadirModificationStats;
|
||||
@@ -499,6 +497,100 @@ pub(crate) static WAIT_LSN_IN_PROGRESS_GLOBAL_MICROS: Lazy<IntCounter> = Lazy::n
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) mod wait_ondemand_download_time {
|
||||
use super::*;
|
||||
const WAIT_ONDEMAND_DOWNLOAD_TIME_BUCKETS: &[f64] = &[
|
||||
0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, // 10 ms - 100ms
|
||||
0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, // 100ms to 1s
|
||||
1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, // 1s to 10s
|
||||
10.0, 20.0, 30.0, 40.0, 50.0, 60.0, // 10s to 1m
|
||||
];
|
||||
|
||||
/// The task kinds for which we want to track wait times for on-demand downloads.
|
||||
/// Other task kinds' wait times are accumulated in label value `unknown`.
|
||||
pub(crate) const WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS: [TaskKind; 2] = [
|
||||
TaskKind::PageRequestHandler,
|
||||
TaskKind::WalReceiverConnectionHandler,
|
||||
];
|
||||
|
||||
pub(crate) static WAIT_ONDEMAND_DOWNLOAD_TIME_GLOBAL: Lazy<Vec<Histogram>> = Lazy::new(|| {
|
||||
let histo = register_histogram_vec!(
|
||||
"pageserver_wait_ondemand_download_seconds_global",
|
||||
"Observations are individual tasks' wait times for on-demand downloads. \
|
||||
If N tasks coalesce on an on-demand download, and it takes 10s, than we observe N * 10s.",
|
||||
&["task_kind"],
|
||||
WAIT_ONDEMAND_DOWNLOAD_TIME_BUCKETS.into(),
|
||||
)
|
||||
.expect("failed to define a metric");
|
||||
WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS
|
||||
.iter()
|
||||
.map(|task_kind| histo.with_label_values(&[task_kind.into()]))
|
||||
.collect::<Vec<_>>()
|
||||
});
|
||||
|
||||
pub(crate) static WAIT_ONDEMAND_DOWNLOAD_TIME_SUM: Lazy<CounterVec> = Lazy::new(|| {
|
||||
register_counter_vec!(
|
||||
// use a name that _could_ be evolved into a per-timeline histogram later
|
||||
"pageserver_wait_ondemand_download_seconds_sum",
|
||||
"Like `pageserver_wait_ondemand_download_seconds_global` but per timeline",
|
||||
&["tenant_id", "shard_id", "timeline_id", "task_kind"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub struct WaitOndemandDownloadTimeSum {
|
||||
counters: [Counter; WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS.len()],
|
||||
}
|
||||
|
||||
impl WaitOndemandDownloadTimeSum {
|
||||
pub(crate) fn new(tenant_id: &str, shard_id: &str, timeline_id: &str) -> Self {
|
||||
let counters = WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS
|
||||
.iter()
|
||||
.map(|task_kind| {
|
||||
WAIT_ONDEMAND_DOWNLOAD_TIME_SUM
|
||||
.get_metric_with_label_values(&[
|
||||
tenant_id,
|
||||
shard_id,
|
||||
timeline_id,
|
||||
task_kind.into(),
|
||||
])
|
||||
.unwrap()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
Self {
|
||||
counters: counters.try_into().unwrap(),
|
||||
}
|
||||
}
|
||||
pub(crate) fn observe(&self, task_kind: TaskKind, duration: Duration) {
|
||||
let maybe = WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|(_, kind)| **kind == task_kind);
|
||||
let Some((idx, _)) = maybe else {
|
||||
return;
|
||||
};
|
||||
WAIT_ONDEMAND_DOWNLOAD_TIME_GLOBAL[idx].observe(duration.as_secs_f64());
|
||||
let counter = &self.counters[idx];
|
||||
counter.inc_by(duration.as_secs_f64());
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn shutdown_timeline(tenant_id: &str, shard_id: &str, timeline_id: &str) {
|
||||
for task_kind in WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS {
|
||||
let _ = WAIT_ONDEMAND_DOWNLOAD_TIME_SUM.remove_label_values(&[
|
||||
tenant_id,
|
||||
shard_id,
|
||||
timeline_id,
|
||||
task_kind.into(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn preinitialize_global_metrics() {
|
||||
Lazy::force(&WAIT_ONDEMAND_DOWNLOAD_TIME_GLOBAL);
|
||||
}
|
||||
}
|
||||
|
||||
static LAST_RECORD_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pageserver_last_record_lsn",
|
||||
@@ -1248,13 +1340,13 @@ pub(crate) static STORAGE_IO_TIME_METRIC: Lazy<StorageIoTime> = Lazy::new(Storag
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
#[repr(usize)]
|
||||
enum StorageIoSizeOperation {
|
||||
pub(crate) enum StorageIoSizeOperation {
|
||||
Read,
|
||||
Write,
|
||||
}
|
||||
|
||||
impl StorageIoSizeOperation {
|
||||
const VARIANTS: &'static [&'static str] = &["read", "write"];
|
||||
pub(crate) const VARIANTS: &'static [&'static str] = &["read", "write"];
|
||||
|
||||
fn as_str(&self) -> &'static str {
|
||||
Self::VARIANTS[*self as usize]
|
||||
@@ -1262,7 +1354,7 @@ impl StorageIoSizeOperation {
|
||||
}
|
||||
|
||||
// Needed for the https://neonprod.grafana.net/d/5uK9tHL4k/picking-tenant-for-relocation?orgId=1
|
||||
static STORAGE_IO_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
pub(crate) static STORAGE_IO_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_io_operations_bytes_total",
|
||||
"Total amount of bytes read/written in IO operations",
|
||||
@@ -2314,13 +2406,18 @@ impl RemoteOpFileKind {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) static REMOTE_OPERATION_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
pub(crate) static REMOTE_TIMELINE_CLIENT_COMPLETION_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"pageserver_remote_operation_seconds",
|
||||
"Time spent on remote storage operations. \
|
||||
Grouped by tenant, timeline, operation_kind and status. \
|
||||
"pageserver_remote_timeline_client_seconds_global",
|
||||
"Time spent on remote timeline client operations. \
|
||||
Grouped by task_kind, file_kind, operation_kind and status. \
|
||||
The task_kind is \
|
||||
- for layer downloads, populated from RequestContext (primary objective of having the label) \
|
||||
- for index downloads, set to 'unknown' \
|
||||
- for any upload operation, set to 'RemoteUploadTask' \
|
||||
This keeps dimensionality at bay. \
|
||||
Does not account for time spent waiting in remote timeline client's queues.",
|
||||
&["file_kind", "op_kind", "status"]
|
||||
&["task_kind", "file_kind", "op_kind", "status"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
@@ -2882,6 +2979,7 @@ pub(crate) struct TimelineMetrics {
|
||||
pub storage_io_size: StorageIoSizeMetrics,
|
||||
pub wait_lsn_in_progress_micros: GlobalAndPerTenantIntCounter,
|
||||
pub wait_lsn_start_finish_counterpair: IntCounterPair,
|
||||
pub wait_ondemand_download_time: wait_ondemand_download_time::WaitOndemandDownloadTimeSum,
|
||||
shutdown: std::sync::atomic::AtomicBool,
|
||||
}
|
||||
|
||||
@@ -3027,6 +3125,13 @@ impl TimelineMetrics {
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
|
||||
let wait_ondemand_download_time =
|
||||
wait_ondemand_download_time::WaitOndemandDownloadTimeSum::new(
|
||||
&tenant_id,
|
||||
&shard_id,
|
||||
&timeline_id,
|
||||
);
|
||||
|
||||
TimelineMetrics {
|
||||
tenant_id,
|
||||
shard_id,
|
||||
@@ -3060,6 +3165,7 @@ impl TimelineMetrics {
|
||||
wal_records_received,
|
||||
wait_lsn_in_progress_micros,
|
||||
wait_lsn_start_finish_counterpair,
|
||||
wait_ondemand_download_time,
|
||||
shutdown: std::sync::atomic::AtomicBool::default(),
|
||||
}
|
||||
}
|
||||
@@ -3252,6 +3358,8 @@ impl TimelineMetrics {
|
||||
.remove_label_values(&mut res, &[tenant_id, shard_id, timeline_id]);
|
||||
}
|
||||
|
||||
wait_ondemand_download_time::shutdown_timeline(tenant_id, shard_id, timeline_id);
|
||||
|
||||
let _ = SMGR_QUERY_STARTED_PER_TENANT_TIMELINE.remove_label_values(&[
|
||||
SmgrQueryType::GetPageAtLsn.into(),
|
||||
tenant_id,
|
||||
@@ -3373,13 +3481,18 @@ impl RemoteTimelineClientMetrics {
|
||||
|
||||
pub fn remote_operation_time(
|
||||
&self,
|
||||
task_kind: Option<TaskKind>,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
status: &'static str,
|
||||
) -> Histogram {
|
||||
let key = (file_kind.as_str(), op_kind.as_str(), status);
|
||||
REMOTE_OPERATION_TIME
|
||||
.get_metric_with_label_values(&[key.0, key.1, key.2])
|
||||
REMOTE_TIMELINE_CLIENT_COMPLETION_LATENCY
|
||||
.get_metric_with_label_values(&[
|
||||
task_kind.as_ref().map(|tk| tk.into()).unwrap_or("unknown"),
|
||||
file_kind.as_str(),
|
||||
op_kind.as_str(),
|
||||
status,
|
||||
])
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
@@ -3624,54 +3737,26 @@ impl Drop for RemoteTimelineClientMetrics {
|
||||
|
||||
/// Wrapper future that measures the time spent by a remote storage operation,
|
||||
/// and records the time and success/failure as a prometheus metric.
|
||||
pub(crate) trait MeasureRemoteOp: Sized {
|
||||
fn measure_remote_op(
|
||||
pub(crate) trait MeasureRemoteOp<O, E>: Sized + Future<Output = Result<O, E>> {
|
||||
async fn measure_remote_op(
|
||||
self,
|
||||
task_kind: Option<TaskKind>, // not all caller contexts have a RequestContext / TaskKind handy
|
||||
file_kind: RemoteOpFileKind,
|
||||
op: RemoteOpKind,
|
||||
metrics: Arc<RemoteTimelineClientMetrics>,
|
||||
) -> MeasuredRemoteOp<Self> {
|
||||
) -> Result<O, E> {
|
||||
let start = Instant::now();
|
||||
MeasuredRemoteOp {
|
||||
inner: self,
|
||||
file_kind,
|
||||
op,
|
||||
start,
|
||||
metrics,
|
||||
}
|
||||
let res = self.await;
|
||||
let duration = start.elapsed();
|
||||
let status = if res.is_ok() { &"success" } else { &"failure" };
|
||||
metrics
|
||||
.remote_operation_time(task_kind, &file_kind, &op, status)
|
||||
.observe(duration.as_secs_f64());
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Sized> MeasureRemoteOp for T {}
|
||||
|
||||
pin_project! {
|
||||
pub(crate) struct MeasuredRemoteOp<F>
|
||||
{
|
||||
#[pin]
|
||||
inner: F,
|
||||
file_kind: RemoteOpFileKind,
|
||||
op: RemoteOpKind,
|
||||
start: Instant,
|
||||
metrics: Arc<RemoteTimelineClientMetrics>,
|
||||
}
|
||||
}
|
||||
|
||||
impl<F: Future<Output = Result<O, E>>, O, E> Future for MeasuredRemoteOp<F> {
|
||||
type Output = Result<O, E>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
let poll_result = this.inner.poll(cx);
|
||||
if let Poll::Ready(ref res) = poll_result {
|
||||
let duration = this.start.elapsed();
|
||||
let status = if res.is_ok() { &"success" } else { &"failure" };
|
||||
this.metrics
|
||||
.remote_operation_time(this.file_kind, this.op, status)
|
||||
.observe(duration.as_secs_f64());
|
||||
}
|
||||
poll_result
|
||||
}
|
||||
}
|
||||
impl<Fut, O, E> MeasureRemoteOp<O, E> for Fut where Fut: Sized + Future<Output = Result<O, E>> {}
|
||||
|
||||
pub mod tokio_epoll_uring {
|
||||
use std::collections::HashMap;
|
||||
@@ -4107,9 +4192,33 @@ pub(crate) fn set_tokio_runtime_setup(setup: &str, num_threads: NonZeroUsize) {
|
||||
.set(u64::try_from(num_threads.get()).unwrap());
|
||||
}
|
||||
|
||||
pub fn preinitialize_metrics(conf: &'static PageServerConf) {
|
||||
static PAGESERVER_CONFIG_IGNORED_ITEMS: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_config_ignored_items",
|
||||
"TOML items present in the on-disk configuration file but ignored by the pageserver config parser.\
|
||||
The `item` label is the dot-separated path of the ignored item in the on-disk configuration file.\
|
||||
The value for an unknown config item is always 1.\
|
||||
There is a special label value \"\", which is 0, so that there is always a metric exposed (simplifies dashboards).",
|
||||
&["item"]
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub fn preinitialize_metrics(
|
||||
conf: &'static PageServerConf,
|
||||
ignored: config::ignored_fields::Paths,
|
||||
) {
|
||||
set_page_service_config_max_batch_size(&conf.page_service_pipelining);
|
||||
|
||||
PAGESERVER_CONFIG_IGNORED_ITEMS
|
||||
.with_label_values(&[""])
|
||||
.set(0);
|
||||
for path in &ignored.paths {
|
||||
PAGESERVER_CONFIG_IGNORED_ITEMS
|
||||
.with_label_values(&[path])
|
||||
.set(1);
|
||||
}
|
||||
|
||||
// Python tests need these and on some we do alerting.
|
||||
//
|
||||
// FIXME(4813): make it so that we have no top level metrics as this fn will easily fall out of
|
||||
@@ -4195,4 +4304,5 @@ pub fn preinitialize_metrics(conf: &'static PageServerConf) {
|
||||
Lazy::force(&tokio_epoll_uring::THREAD_LOCAL_METRICS_STORAGE);
|
||||
|
||||
tenant_throttling::preinitialize_global_metrics();
|
||||
wait_ondemand_download_time::preinitialize_global_metrics();
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use std::{io, str};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use anyhow::{Context, bail};
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::Buf;
|
||||
@@ -53,7 +54,9 @@ use utils::sync::spsc_fold;
|
||||
use crate::auth::check_permission;
|
||||
use crate::basebackup::BasebackupError;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::context::{
|
||||
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::metrics::{
|
||||
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, LIVE_CONNECTIONS, SmgrOpTimer,
|
||||
TimelineMetrics,
|
||||
@@ -100,6 +103,7 @@ pub fn spawn(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
pg_auth: Option<Arc<SwappableJwtAuth>>,
|
||||
perf_trace_dispatch: Option<Dispatch>,
|
||||
tcp_listener: tokio::net::TcpListener,
|
||||
) -> Listener {
|
||||
let cancel = CancellationToken::new();
|
||||
@@ -117,6 +121,7 @@ pub fn spawn(
|
||||
conf,
|
||||
tenant_manager,
|
||||
pg_auth,
|
||||
perf_trace_dispatch,
|
||||
tcp_listener,
|
||||
conf.pg_auth_type,
|
||||
conf.page_service_pipelining.clone(),
|
||||
@@ -173,6 +178,7 @@ pub async fn libpq_listener_main(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
perf_trace_dispatch: Option<Dispatch>,
|
||||
listener: tokio::net::TcpListener,
|
||||
auth_type: AuthType,
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
@@ -205,8 +211,12 @@ pub async fn libpq_listener_main(
|
||||
// Connection established. Spawn a new task to handle it.
|
||||
debug!("accepted connection from {}", peer_addr);
|
||||
let local_auth = auth.clone();
|
||||
let connection_ctx = listener_ctx
|
||||
.detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
|
||||
let connection_ctx = RequestContextBuilder::from(&listener_ctx)
|
||||
.task_kind(TaskKind::PageRequestHandler)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.perf_span_dispatch(perf_trace_dispatch.clone())
|
||||
.detached_child();
|
||||
|
||||
connection_handler_tasks.spawn(page_service_conn_main(
|
||||
conf,
|
||||
tenant_manager.clone(),
|
||||
@@ -237,6 +247,15 @@ pub async fn libpq_listener_main(
|
||||
|
||||
type ConnectionHandlerResult = anyhow::Result<()>;
|
||||
|
||||
/// Perf root spans start at the per-request level, after shard routing.
|
||||
/// This struct carries connection-level information to the root perf span definition.
|
||||
#[derive(Clone)]
|
||||
struct ConnectionPerfSpanFields {
|
||||
peer_addr: String,
|
||||
application_name: Option<String>,
|
||||
compute_mode: Option<String>,
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(peer_addr, application_name, compute_mode))]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn page_service_conn_main(
|
||||
@@ -261,6 +280,12 @@ async fn page_service_conn_main(
|
||||
let socket_fd = socket.as_raw_fd();
|
||||
|
||||
let peer_addr = socket.peer_addr().context("get peer address")?;
|
||||
|
||||
let perf_span_fields = ConnectionPerfSpanFields {
|
||||
peer_addr: peer_addr.to_string(),
|
||||
application_name: None, // filled in later
|
||||
compute_mode: None, // filled in later
|
||||
};
|
||||
tracing::Span::current().record("peer_addr", field::display(peer_addr));
|
||||
|
||||
// setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
|
||||
@@ -304,6 +329,7 @@ async fn page_service_conn_main(
|
||||
tenant_manager,
|
||||
auth,
|
||||
pipelining_config,
|
||||
perf_span_fields,
|
||||
connection_ctx,
|
||||
cancel.clone(),
|
||||
gate_guard,
|
||||
@@ -348,6 +374,8 @@ struct PageServerHandler {
|
||||
/// `process_query` creates a child context from this one.
|
||||
connection_ctx: RequestContext,
|
||||
|
||||
perf_span_fields: ConnectionPerfSpanFields,
|
||||
|
||||
cancel: CancellationToken,
|
||||
|
||||
/// None only while pagestream protocol is being processed.
|
||||
@@ -607,6 +635,7 @@ impl std::fmt::Display for BatchedPageStreamError {
|
||||
struct BatchedGetPageRequest {
|
||||
req: PagestreamGetPageRequest,
|
||||
timer: SmgrOpTimer,
|
||||
ctx: RequestContext,
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -692,11 +721,13 @@ impl BatchedFeMessage {
|
||||
}
|
||||
|
||||
impl PageServerHandler {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
perf_span_fields: ConnectionPerfSpanFields,
|
||||
connection_ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
gate_guard: GateGuard,
|
||||
@@ -706,6 +737,7 @@ impl PageServerHandler {
|
||||
auth,
|
||||
claims: None,
|
||||
connection_ctx,
|
||||
perf_span_fields,
|
||||
timeline_handles: Some(TimelineHandles::new(tenant_manager)),
|
||||
cancel,
|
||||
pipelining_config,
|
||||
@@ -743,6 +775,7 @@ impl PageServerHandler {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
timeline_handles: &mut TimelineHandles,
|
||||
conn_perf_span_fields: &ConnectionPerfSpanFields,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
protocol_version: PagestreamProtocolVersion,
|
||||
@@ -902,10 +935,12 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
let key = rel_block_to_key(req.rel, req.blkno);
|
||||
let shard = match timeline_handles
|
||||
|
||||
let res = timeline_handles
|
||||
.get(tenant_id, timeline_id, ShardSelector::Page(key))
|
||||
.await
|
||||
{
|
||||
.await;
|
||||
|
||||
let shard = match res {
|
||||
Ok(tl) => tl,
|
||||
Err(e) => {
|
||||
let span = mkspan!(before shard routing);
|
||||
@@ -932,6 +967,41 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let ctx = if shard.is_get_page_request_sampled() {
|
||||
RequestContextBuilder::from(ctx)
|
||||
.root_perf_span(|| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
"GET_PAGE",
|
||||
peer_addr = conn_perf_span_fields.peer_addr,
|
||||
application_name = conn_perf_span_fields.application_name,
|
||||
compute_mode = conn_perf_span_fields.compute_mode,
|
||||
tenant_id = %tenant_id,
|
||||
shard_id = %shard.get_shard_identity().shard_slug(),
|
||||
timeline_id = %timeline_id,
|
||||
lsn = %req.hdr.request_lsn,
|
||||
request_id = %req.hdr.reqid,
|
||||
key = %key,
|
||||
)
|
||||
})
|
||||
.attached_child()
|
||||
} else {
|
||||
ctx.attached_child()
|
||||
};
|
||||
|
||||
// This ctx travels as part of the BatchedFeMessage through
|
||||
// batching into the request handler.
|
||||
// The request handler needs to do some per-request work
|
||||
// (relsize check) before dispatching the batch as a single
|
||||
// get_vectored call to the Timeline.
|
||||
// This ctx will be used for the reslize check, whereas the
|
||||
// get_vectored call will be a different ctx with separate
|
||||
// perf span.
|
||||
let ctx = ctx.with_scope_page_service_pagestream(&shard);
|
||||
|
||||
// Similar game for this `span`: we funnel it through so that
|
||||
// request handler log messages contain the request-specific fields.
|
||||
let span = mkspan!(shard.tenant_shard_id.shard_slug());
|
||||
|
||||
let timer = record_op_start_and_throttle(
|
||||
@@ -939,19 +1009,34 @@ impl PageServerHandler {
|
||||
metrics::SmgrQueryType::GetPageAtLsn,
|
||||
received_at,
|
||||
)
|
||||
.maybe_perf_instrument(&ctx, |current_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: current_perf_span,
|
||||
"THROTTLE",
|
||||
)
|
||||
})
|
||||
.await?;
|
||||
|
||||
// We're holding the Handle
|
||||
let effective_request_lsn = match Self::wait_or_get_last_lsn(
|
||||
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
|
||||
let res = Self::wait_or_get_last_lsn(
|
||||
&shard,
|
||||
req.hdr.request_lsn,
|
||||
req.hdr.not_modified_since,
|
||||
&shard.get_applied_gc_cutoff_lsn(),
|
||||
ctx,
|
||||
&ctx,
|
||||
)
|
||||
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
|
||||
.await
|
||||
{
|
||||
.maybe_perf_instrument(&ctx, |current_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: current_perf_span,
|
||||
"WAIT_LSN",
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
||||
let effective_request_lsn = match res {
|
||||
Ok(lsn) => lsn,
|
||||
Err(e) => {
|
||||
return respond_error!(span, e);
|
||||
@@ -961,7 +1046,7 @@ impl PageServerHandler {
|
||||
span,
|
||||
shard: shard.downgrade(),
|
||||
effective_request_lsn,
|
||||
pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }],
|
||||
pages: smallvec::smallvec![BatchedGetPageRequest { req, timer, ctx }],
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -1514,12 +1599,14 @@ impl PageServerHandler {
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
|
||||
{
|
||||
let cancel = self.cancel.clone();
|
||||
|
||||
let err = loop {
|
||||
let msg = Self::pagestream_read_message(
|
||||
&mut pgb_reader,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&mut timeline_handles,
|
||||
&self.perf_span_fields,
|
||||
&cancel,
|
||||
ctx,
|
||||
protocol_version,
|
||||
@@ -1653,6 +1740,8 @@ impl PageServerHandler {
|
||||
// Batcher
|
||||
//
|
||||
|
||||
let perf_span_fields = self.perf_span_fields.clone();
|
||||
|
||||
let cancel_batcher = self.cancel.child_token();
|
||||
let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
|
||||
let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
|
||||
@@ -1666,6 +1755,7 @@ impl PageServerHandler {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&mut timeline_handles,
|
||||
&perf_span_fields,
|
||||
&cancel_batcher,
|
||||
&ctx,
|
||||
protocol_version,
|
||||
@@ -2004,7 +2094,9 @@ impl PageServerHandler {
|
||||
|
||||
let results = timeline
|
||||
.get_rel_page_at_lsn_batched(
|
||||
requests.iter().map(|p| (&p.req.rel, &p.req.blkno)),
|
||||
requests
|
||||
.iter()
|
||||
.map(|p| (&p.req.rel, &p.req.blkno, p.ctx.attached_child())),
|
||||
effective_lsn,
|
||||
io_concurrency,
|
||||
ctx,
|
||||
@@ -2606,12 +2698,14 @@ where
|
||||
|
||||
if let FeStartupPacket::StartupMessage { params, .. } = sm {
|
||||
if let Some(app_name) = params.get("application_name") {
|
||||
self.perf_span_fields.application_name = Some(app_name.to_string());
|
||||
Span::current().record("application_name", field::display(app_name));
|
||||
}
|
||||
if let Some(options) = params.get("options") {
|
||||
let (config, _) = parse_options(options);
|
||||
for (key, value) in config {
|
||||
if key == "neon.compute_mode" {
|
||||
self.perf_span_fields.compute_mode = Some(value.clone());
|
||||
Span::current().record("compute_mode", field::display(value));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
|
||||
use std::ops::{ControlFlow, Range};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use anyhow::{Context, ensure};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
@@ -31,7 +32,7 @@ use postgres_ffi::{BLCKSZ, Oid, RepOriginId, TimestampTz, TransactionId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use strum::IntoEnumIterator;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, trace, warn};
|
||||
use tracing::{debug, info, info_span, trace, warn};
|
||||
use utils::bin_ser::{BeSer, DeserializeError};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::pausable_failpoint;
|
||||
@@ -39,7 +40,7 @@ use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
|
||||
|
||||
use super::tenant::{PageReconstructError, Timeline};
|
||||
use crate::aux_file;
|
||||
use crate::context::RequestContext;
|
||||
use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::metrics::{
|
||||
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
|
||||
@@ -209,7 +210,9 @@ impl Timeline {
|
||||
let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
|
||||
let res = self
|
||||
.get_rel_page_at_lsn_batched(
|
||||
pages.iter().map(|(tag, blknum)| (tag, blknum)),
|
||||
pages
|
||||
.iter()
|
||||
.map(|(tag, blknum)| (tag, blknum, ctx.attached_child())),
|
||||
effective_lsn,
|
||||
io_concurrency.clone(),
|
||||
ctx,
|
||||
@@ -248,7 +251,7 @@ impl Timeline {
|
||||
/// The ordering of the returned vec corresponds to the ordering of `pages`.
|
||||
pub(crate) async fn get_rel_page_at_lsn_batched(
|
||||
&self,
|
||||
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber)>,
|
||||
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, RequestContext)>,
|
||||
effective_lsn: Lsn,
|
||||
io_concurrency: IoConcurrency,
|
||||
ctx: &RequestContext,
|
||||
@@ -262,8 +265,11 @@ impl Timeline {
|
||||
let mut result = Vec::with_capacity(pages.len());
|
||||
let result_slots = result.spare_capacity_mut();
|
||||
|
||||
let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[usize; 1]>> = BTreeMap::default();
|
||||
for (response_slot_idx, (tag, blknum)) in pages.enumerate() {
|
||||
let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
|
||||
BTreeMap::default();
|
||||
|
||||
let mut perf_instrument = false;
|
||||
for (response_slot_idx, (tag, blknum, ctx)) in pages.enumerate() {
|
||||
if tag.relnode == 0 {
|
||||
result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
|
||||
RelationError::InvalidRelnode.into(),
|
||||
@@ -274,7 +280,16 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let nblocks = match self
|
||||
.get_rel_size(*tag, Version::Lsn(effective_lsn), ctx)
|
||||
.get_rel_size(*tag, Version::Lsn(effective_lsn), &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"GET_REL_SIZE",
|
||||
reltag=%tag,
|
||||
lsn=%effective_lsn,
|
||||
)
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(nblocks) => nblocks,
|
||||
@@ -297,8 +312,12 @@ impl Timeline {
|
||||
|
||||
let key = rel_block_to_key(*tag, *blknum);
|
||||
|
||||
if ctx.has_perf_span() {
|
||||
perf_instrument = true;
|
||||
}
|
||||
|
||||
let key_slots = keys_slots.entry(key).or_default();
|
||||
key_slots.push(response_slot_idx);
|
||||
key_slots.push((response_slot_idx, ctx));
|
||||
}
|
||||
|
||||
let keyspace = {
|
||||
@@ -314,16 +333,34 @@ impl Timeline {
|
||||
acc.to_keyspace()
|
||||
};
|
||||
|
||||
match self
|
||||
.get_vectored(keyspace, effective_lsn, io_concurrency, ctx)
|
||||
.await
|
||||
{
|
||||
let ctx = match perf_instrument {
|
||||
true => RequestContextBuilder::from(ctx)
|
||||
.root_perf_span(|| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
"GET_VECTORED",
|
||||
tenant_id = %self.tenant_shard_id.tenant_id,
|
||||
timeline_id = %self.timeline_id,
|
||||
lsn = %effective_lsn,
|
||||
shard = %self.tenant_shard_id.shard_slug(),
|
||||
)
|
||||
})
|
||||
.attached_child(),
|
||||
false => ctx.attached_child(),
|
||||
};
|
||||
|
||||
let res = self
|
||||
.get_vectored(keyspace, effective_lsn, io_concurrency, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |current_perf_span| current_perf_span.clone())
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(results) => {
|
||||
for (key, res) in results {
|
||||
let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
|
||||
let first_slot = key_slots.next().unwrap();
|
||||
let (first_slot, first_req_ctx) = key_slots.next().unwrap();
|
||||
|
||||
for slot in key_slots {
|
||||
for (slot, req_ctx) in key_slots {
|
||||
let clone = match &res {
|
||||
Ok(buf) => Ok(buf.clone()),
|
||||
Err(err) => Err(match err {
|
||||
@@ -341,17 +378,22 @@ impl Timeline {
|
||||
};
|
||||
|
||||
result_slots[slot].write(clone);
|
||||
// There is no standardized way to express that the batched span followed from N request spans.
|
||||
// So, abuse the system and mark the request contexts as follows_from the batch span, so we get
|
||||
// some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
|
||||
req_ctx.perf_follows_from(&ctx);
|
||||
slots_filled += 1;
|
||||
}
|
||||
|
||||
result_slots[first_slot].write(res);
|
||||
first_req_ctx.perf_follows_from(&ctx);
|
||||
slots_filled += 1;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
// this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
|
||||
// (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
|
||||
for slot in keys_slots.values().flatten() {
|
||||
for (slot, req_ctx) in keys_slots.values().flatten() {
|
||||
// this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
|
||||
// but without taking ownership of the GetVectoredError
|
||||
let err = match &err {
|
||||
@@ -383,6 +425,7 @@ impl Timeline {
|
||||
}
|
||||
};
|
||||
|
||||
req_ctx.perf_follows_from(&ctx);
|
||||
result_slots[*slot].write(err);
|
||||
}
|
||||
|
||||
|
||||
@@ -219,8 +219,7 @@ pageserver_runtime!(MGMT_REQUEST_RUNTIME, "mgmt request worker");
|
||||
pageserver_runtime!(WALRECEIVER_RUNTIME, "walreceiver worker");
|
||||
pageserver_runtime!(BACKGROUND_RUNTIME, "background op worker");
|
||||
// Bump this number when adding a new pageserver_runtime!
|
||||
// SAFETY: it's obviously correct
|
||||
const NUM_MULTIPLE_RUNTIMES: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(4) };
|
||||
const NUM_MULTIPLE_RUNTIMES: NonZeroUsize = NonZeroUsize::new(4).unwrap();
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct PageserverTaskId(u64);
|
||||
|
||||
@@ -3248,17 +3248,23 @@ impl Tenant {
|
||||
async fn housekeeping(&self) {
|
||||
// Call through to all timelines to freeze ephemeral layers as needed. This usually happens
|
||||
// during ingest, but we don't want idle timelines to hold open layers for too long.
|
||||
let timelines = self
|
||||
.timelines
|
||||
.lock()
|
||||
.unwrap()
|
||||
.values()
|
||||
.filter(|tli| tli.is_active())
|
||||
.cloned()
|
||||
.collect_vec();
|
||||
//
|
||||
// We don't do this if the tenant can't upload layers (i.e. it's in stale attachment mode).
|
||||
// We don't run compaction in this case either, and don't want to keep flushing tiny L0
|
||||
// layers that won't be compacted down.
|
||||
if self.tenant_conf.load().location.may_upload_layers_hint() {
|
||||
let timelines = self
|
||||
.timelines
|
||||
.lock()
|
||||
.unwrap()
|
||||
.values()
|
||||
.filter(|tli| tli.is_active())
|
||||
.cloned()
|
||||
.collect_vec();
|
||||
|
||||
for timeline in timelines {
|
||||
timeline.maybe_freeze_ephemeral_layer().await;
|
||||
for timeline in timelines {
|
||||
timeline.maybe_freeze_ephemeral_layer().await;
|
||||
}
|
||||
}
|
||||
|
||||
// Shut down walredo if idle.
|
||||
@@ -3683,7 +3689,7 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
}
|
||||
TenantState::Active { .. } => {
|
||||
TenantState::Active => {
|
||||
return Ok(());
|
||||
}
|
||||
TenantState::Broken { reason, .. } => {
|
||||
@@ -4199,9 +4205,9 @@ impl Tenant {
|
||||
self.cancel.child_token(),
|
||||
);
|
||||
|
||||
let timeline_ctx = RequestContextBuilder::extend(ctx)
|
||||
let timeline_ctx = RequestContextBuilder::from(ctx)
|
||||
.scope(context::Scope::new_timeline(&timeline))
|
||||
.build();
|
||||
.detached_child();
|
||||
|
||||
Ok((timeline, timeline_ctx))
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ impl<Value: Clone> LayerCoverage<Value> {
|
||||
///
|
||||
/// Complexity: O(log N)
|
||||
fn add_node(&mut self, key: i128) {
|
||||
let value = match self.nodes.range(..=key).last() {
|
||||
let value = match self.nodes.range(..=key).next_back() {
|
||||
Some((_, Some(v))) => Some(v.clone()),
|
||||
Some((_, None)) => None,
|
||||
None => None,
|
||||
|
||||
@@ -58,7 +58,7 @@ use crate::{InitializationOrder, TEMP_FILE_SUFFIX};
|
||||
|
||||
/// For a tenant that appears in TenantsMap, it may either be
|
||||
/// - `Attached`: has a full Tenant object, is elegible to service
|
||||
/// reads and ingest WAL.
|
||||
/// reads and ingest WAL.
|
||||
/// - `Secondary`: is only keeping a local cache warm.
|
||||
///
|
||||
/// Secondary is a totally distinct state rather than being a mode of a `Tenant`, because
|
||||
|
||||
@@ -642,6 +642,7 @@ impl RemoteTimelineClient {
|
||||
cancel,
|
||||
)
|
||||
.measure_remote_op(
|
||||
Option::<TaskKind>::None,
|
||||
RemoteOpFileKind::Index,
|
||||
RemoteOpKind::Download,
|
||||
Arc::clone(&self.metrics),
|
||||
@@ -739,6 +740,7 @@ impl RemoteTimelineClient {
|
||||
ctx,
|
||||
)
|
||||
.measure_remote_op(
|
||||
Some(ctx.task_kind()),
|
||||
RemoteOpFileKind::Layer,
|
||||
RemoteOpKind::Download,
|
||||
Arc::clone(&self.metrics),
|
||||
@@ -2175,6 +2177,7 @@ impl RemoteTimelineClient {
|
||||
&self.cancel,
|
||||
)
|
||||
.measure_remote_op(
|
||||
Some(TaskKind::RemoteUploadTask),
|
||||
RemoteOpFileKind::Layer,
|
||||
RemoteOpKind::Upload,
|
||||
Arc::clone(&self.metrics),
|
||||
@@ -2191,6 +2194,7 @@ impl RemoteTimelineClient {
|
||||
&self.cancel,
|
||||
)
|
||||
.measure_remote_op(
|
||||
Some(TaskKind::RemoteUploadTask),
|
||||
RemoteOpFileKind::Index,
|
||||
RemoteOpKind::Upload,
|
||||
Arc::clone(&self.metrics),
|
||||
|
||||
@@ -130,7 +130,7 @@ impl IndexPart {
|
||||
/// Version history
|
||||
/// - 2: added `deleted_at`
|
||||
/// - 3: no longer deserialize `timeline_layers` (serialized format is the same, but timeline_layers
|
||||
/// is always generated from the keys of `layer_metadata`)
|
||||
/// is always generated from the keys of `layer_metadata`)
|
||||
/// - 4: timeline_layers is fully removed.
|
||||
/// - 5: lineage was added
|
||||
/// - 6: last_aux_file_policy is added.
|
||||
|
||||
@@ -167,10 +167,17 @@ impl SecondaryTenant {
|
||||
|
||||
self.validate_metrics();
|
||||
|
||||
// Metrics are subtracted from and/or removed eagerly.
|
||||
// Deletions are done in the background via [`BackgroundPurges::spawn`].
|
||||
let tenant_id = self.tenant_shard_id.tenant_id.to_string();
|
||||
let shard_id = format!("{}", self.tenant_shard_id.shard_slug());
|
||||
let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
|
||||
let _ = SECONDARY_HEATMAP_TOTAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
|
||||
|
||||
self.detail
|
||||
.lock()
|
||||
.unwrap()
|
||||
.drain_timelines(&self.tenant_shard_id, &self.resident_size_metric);
|
||||
}
|
||||
|
||||
pub(crate) fn set_config(&self, config: &SecondaryLocationConfig) {
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use crate::metrics::{STORAGE_IO_SIZE, StorageIoSizeOperation};
|
||||
use camino::Utf8PathBuf;
|
||||
use chrono::format::{DelayedFormat, StrftimeItems};
|
||||
use futures::Future;
|
||||
@@ -124,15 +125,53 @@ impl OnDiskState {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(super) struct SecondaryDetailTimeline {
|
||||
on_disk_layers: HashMap<LayerName, OnDiskState>,
|
||||
|
||||
/// We remember when layers were evicted, to prevent re-downloading them.
|
||||
pub(super) evicted_at: HashMap<LayerName, SystemTime>,
|
||||
|
||||
ctx: RequestContext,
|
||||
}
|
||||
|
||||
impl Clone for SecondaryDetailTimeline {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
on_disk_layers: self.on_disk_layers.clone(),
|
||||
evicted_at: self.evicted_at.clone(),
|
||||
// This is a bit awkward. The downloader code operates on a snapshot
|
||||
// of the secondary list to avoid locking it for extended periods of time.
|
||||
// No particularly strong reason to chose [`RequestContext::detached_child`],
|
||||
// but makes more sense than [`RequestContext::attached_child`].
|
||||
ctx: self
|
||||
.ctx
|
||||
.detached_child(self.ctx.task_kind(), self.ctx.download_behavior()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for SecondaryDetailTimeline {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("SecondaryDetailTimeline")
|
||||
.field("on_disk_layers", &self.on_disk_layers)
|
||||
.field("evicted_at", &self.evicted_at)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl SecondaryDetailTimeline {
|
||||
pub(super) fn empty(ctx: RequestContext) -> Self {
|
||||
SecondaryDetailTimeline {
|
||||
on_disk_layers: Default::default(),
|
||||
evicted_at: Default::default(),
|
||||
ctx,
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn context(&self) -> &RequestContext {
|
||||
&self.ctx
|
||||
}
|
||||
|
||||
pub(super) fn remove_layer(
|
||||
&mut self,
|
||||
name: &LayerName,
|
||||
@@ -258,18 +297,50 @@ impl SecondaryDetail {
|
||||
|
||||
pub(super) fn remove_timeline(
|
||||
&mut self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
resident_metric: &UIntGauge,
|
||||
) {
|
||||
let removed = self.timelines.remove(timeline_id);
|
||||
if let Some(removed) = removed {
|
||||
resident_metric.sub(
|
||||
removed
|
||||
.on_disk_layers
|
||||
.values()
|
||||
.map(|l| l.metadata.file_size)
|
||||
.sum(),
|
||||
);
|
||||
Self::clear_timeline_metrics(tenant_shard_id, timeline_id, removed, resident_metric);
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn drain_timelines(
|
||||
&mut self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
resident_metric: &UIntGauge,
|
||||
) {
|
||||
for (timeline_id, removed) in self.timelines.drain() {
|
||||
Self::clear_timeline_metrics(tenant_shard_id, &timeline_id, removed, resident_metric);
|
||||
}
|
||||
}
|
||||
|
||||
fn clear_timeline_metrics(
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
detail: SecondaryDetailTimeline,
|
||||
resident_metric: &UIntGauge,
|
||||
) {
|
||||
resident_metric.sub(
|
||||
detail
|
||||
.on_disk_layers
|
||||
.values()
|
||||
.map(|l| l.metadata.file_size)
|
||||
.sum(),
|
||||
);
|
||||
|
||||
let shard_id = format!("{}", tenant_shard_id.shard_slug());
|
||||
let tenant_id = tenant_shard_id.tenant_id.to_string();
|
||||
let timeline_id = timeline_id.to_string();
|
||||
for op in StorageIoSizeOperation::VARIANTS {
|
||||
let _ = STORAGE_IO_SIZE.remove_label_values(&[
|
||||
op,
|
||||
tenant_id.as_str(),
|
||||
shard_id.as_str(),
|
||||
timeline_id.as_str(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -727,6 +798,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
last_heatmap,
|
||||
timeline,
|
||||
&self.secondary_state.resident_size_metric,
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -774,7 +846,6 @@ impl<'a> TenantDownloader<'a> {
|
||||
|
||||
// Download the layers in the heatmap
|
||||
for timeline in heatmap.timelines {
|
||||
let ctx = &ctx.with_scope_secondary_timeline(tenant_shard_id, &timeline.timeline_id);
|
||||
let timeline_state = timeline_states
|
||||
.remove(&timeline.timeline_id)
|
||||
.expect("Just populated above");
|
||||
@@ -917,7 +988,11 @@ impl<'a> TenantDownloader<'a> {
|
||||
for delete_timeline in &delete_timelines {
|
||||
// We haven't removed from disk yet, but optimistically remove from in-memory state: if removal
|
||||
// from disk fails that will be a fatal error.
|
||||
detail.remove_timeline(delete_timeline, &self.secondary_state.resident_size_metric);
|
||||
detail.remove_timeline(
|
||||
self.secondary_state.get_tenant_shard_id(),
|
||||
delete_timeline,
|
||||
&self.secondary_state.resident_size_metric,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1013,7 +1088,6 @@ impl<'a> TenantDownloader<'a> {
|
||||
timeline: HeatMapTimeline,
|
||||
timeline_state: SecondaryDetailTimeline,
|
||||
deadline: Instant,
|
||||
ctx: &RequestContext,
|
||||
) -> (Result<(), UpdateError>, Vec<HeatMapLayer>) {
|
||||
// Accumulate updates to the state
|
||||
let mut touched = Vec::new();
|
||||
@@ -1044,7 +1118,12 @@ impl<'a> TenantDownloader<'a> {
|
||||
}
|
||||
|
||||
match self
|
||||
.download_layer(tenant_shard_id, &timeline_id, layer, ctx)
|
||||
.download_layer(
|
||||
tenant_shard_id,
|
||||
&timeline_id,
|
||||
layer,
|
||||
timeline_state.context(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(layer)) => touched.push(layer),
|
||||
@@ -1155,13 +1234,16 @@ impl<'a> TenantDownloader<'a> {
|
||||
tracing::debug!(timeline_id=%timeline_id, "Downloading layers, {} in heatmap", timeline.hot_layers().count());
|
||||
|
||||
let (result, touched) = self
|
||||
.download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline, ctx)
|
||||
.download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline)
|
||||
.await;
|
||||
|
||||
// Write updates to state to record layers we just downloaded or touched, irrespective of whether the overall result was successful
|
||||
{
|
||||
let mut detail = self.secondary_state.detail.lock().unwrap();
|
||||
let timeline_detail = detail.timelines.entry(timeline_id).or_default();
|
||||
let timeline_detail = detail.timelines.entry(timeline_id).or_insert_with(|| {
|
||||
let ctx = ctx.with_scope_secondary_timeline(tenant_shard_id, &timeline_id);
|
||||
SecondaryDetailTimeline::empty(ctx)
|
||||
});
|
||||
|
||||
tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
|
||||
touched.into_iter().for_each(|t| {
|
||||
@@ -1295,10 +1377,12 @@ async fn init_timeline_state(
|
||||
last_heatmap: Option<&HeatMapTimeline>,
|
||||
heatmap: &HeatMapTimeline,
|
||||
resident_metric: &UIntGauge,
|
||||
ctx: &RequestContext,
|
||||
) -> SecondaryDetailTimeline {
|
||||
let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
|
||||
let mut detail = SecondaryDetailTimeline::default();
|
||||
let ctx = ctx.with_scope_secondary_timeline(tenant_shard_id, &heatmap.timeline_id);
|
||||
let mut detail = SecondaryDetailTimeline::empty(ctx);
|
||||
|
||||
let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
|
||||
let mut dir = match tokio::fs::read_dir(&timeline_path).await {
|
||||
Ok(d) => d,
|
||||
Err(e) => {
|
||||
|
||||
@@ -13,13 +13,13 @@ pub mod merge_iterator;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{BinaryHeap, HashMap};
|
||||
use std::future::Future;
|
||||
use std::ops::Range;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
pub use batch_split_writer::{BatchLayerWriter, SplitDeltaLayerWriter, SplitImageLayerWriter};
|
||||
use bytes::Bytes;
|
||||
pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
|
||||
@@ -34,7 +34,7 @@ use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::value::Value;
|
||||
use tracing::{Instrument, trace};
|
||||
use tracing::{Instrument, info_span, trace};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::gate::GateGuard;
|
||||
|
||||
@@ -43,7 +43,9 @@ use super::PageReconstructError;
|
||||
use super::layer_map::InMemoryLayerDesc;
|
||||
use super::timeline::{GetVectoredError, ReadPath};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{AccessStatsBehavior, RequestContext};
|
||||
use crate::context::{
|
||||
AccessStatsBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
|
||||
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
|
||||
where
|
||||
@@ -874,13 +876,37 @@ impl ReadableLayer {
|
||||
) -> Result<(), GetVectoredError> {
|
||||
match self {
|
||||
ReadableLayer::PersistentLayer(layer) => {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_LAYER",
|
||||
layer = %layer
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await
|
||||
}
|
||||
ReadableLayer::InMemoryLayer(layer) => {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_LAYER",
|
||||
layer = %layer
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -896,9 +896,9 @@ impl DeltaLayerInner {
|
||||
where
|
||||
Reader: BlockReader + Clone,
|
||||
{
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
@@ -1105,9 +1105,9 @@ impl DeltaLayerInner {
|
||||
all_keys.push(entry);
|
||||
true
|
||||
},
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
&RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build(),
|
||||
.attached_child(),
|
||||
)
|
||||
.await?;
|
||||
if let Some(last) = all_keys.last_mut() {
|
||||
|
||||
@@ -481,9 +481,9 @@ impl ImageLayerInner {
|
||||
let tree_reader =
|
||||
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
|
||||
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
|
||||
@@ -421,9 +421,9 @@ impl InMemoryLayer {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
let inner = self.inner.read().await;
|
||||
|
||||
|
||||
@@ -3,12 +3,13 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::{Duration, SystemTime};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use anyhow::Context;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::HistoricLayerInfo;
|
||||
use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
|
||||
use tracing::Instrument;
|
||||
use tracing::{Instrument, info_span};
|
||||
use utils::generation::Generation;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -18,7 +19,7 @@ use super::delta_layer::{self};
|
||||
use super::image_layer::{self};
|
||||
use super::{
|
||||
AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
|
||||
LayerVisibilityHint, PersistentLayerDesc, ValuesReconstructState,
|
||||
LayerVisibilityHint, PerfInstrumentFutureExt, PersistentLayerDesc, ValuesReconstructState,
|
||||
};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
|
||||
@@ -324,16 +325,29 @@ impl Layer {
|
||||
reconstruct_data: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
let downloaded =
|
||||
let downloaded = {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"GET_LAYER",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
self.0
|
||||
.get_or_maybe_download(true, ctx)
|
||||
.get_or_maybe_download(true, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_context| crnt_perf_context.clone())
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
|
||||
GetVectoredError::Cancelled
|
||||
}
|
||||
other => GetVectoredError::Other(anyhow::anyhow!(other)),
|
||||
})?;
|
||||
})?
|
||||
};
|
||||
|
||||
let this = ResidentLayer {
|
||||
downloaded: downloaded.clone(),
|
||||
owner: self.clone(),
|
||||
@@ -341,9 +355,20 @@ impl Layer {
|
||||
|
||||
self.record_access(ctx);
|
||||
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"VISIT_LAYER",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
downloaded
|
||||
.get_values_reconstruct_data(this, keyspace, lsn_range, reconstruct_data, ctx)
|
||||
.get_values_reconstruct_data(this, keyspace, lsn_range, reconstruct_data, &ctx)
|
||||
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
GetVectoredError::Other(err) => GetVectoredError::Other(
|
||||
@@ -950,6 +975,10 @@ impl LayerInner {
|
||||
allow_download: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<DownloadedLayer>, DownloadError> {
|
||||
let mut wait_for_download_recorder =
|
||||
scopeguard::guard(utils::elapsed_accum::ElapsedAccum::default(), |accum| {
|
||||
ctx.ondemand_download_wait_observe(accum.get());
|
||||
});
|
||||
let (weak, permit) = {
|
||||
// get_or_init_detached can:
|
||||
// - be fast (mutex lock) OR uncontested semaphore permit acquire
|
||||
@@ -958,7 +987,7 @@ impl LayerInner {
|
||||
|
||||
let locked = self
|
||||
.inner
|
||||
.get_or_init_detached()
|
||||
.get_or_init_detached_measured(Some(&mut wait_for_download_recorder))
|
||||
.await
|
||||
.map(|mut guard| guard.get_and_upgrade().ok_or(guard));
|
||||
|
||||
@@ -988,6 +1017,7 @@ impl LayerInner {
|
||||
Err(permit) => (None, permit),
|
||||
}
|
||||
};
|
||||
let _guard = wait_for_download_recorder.guard();
|
||||
|
||||
if let Some(weak) = weak {
|
||||
// only drop the weak after dropping the heavier_once_cell guard
|
||||
@@ -1045,15 +1075,34 @@ impl LayerInner {
|
||||
return Err(DownloadError::DownloadRequired);
|
||||
}
|
||||
|
||||
let download_ctx = ctx.detached_child(TaskKind::LayerDownload, DownloadBehavior::Download);
|
||||
let ctx = if ctx.has_perf_span() {
|
||||
let dl_ctx = RequestContextBuilder::from(ctx)
|
||||
.task_kind(TaskKind::LayerDownload)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.root_perf_span(|| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
"DOWNLOAD_LAYER",
|
||||
layer = %self,
|
||||
reason = %reason
|
||||
)
|
||||
})
|
||||
.detached_child();
|
||||
ctx.perf_follows_from(&dl_ctx);
|
||||
dl_ctx
|
||||
} else {
|
||||
ctx.attached_child()
|
||||
};
|
||||
|
||||
async move {
|
||||
tracing::info!(%reason, "downloading on-demand");
|
||||
|
||||
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
|
||||
let res = self
|
||||
.download_init_and_wait(timeline, permit, download_ctx)
|
||||
.download_init_and_wait(timeline, permit, ctx.attached_child())
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await?;
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
Ok(res)
|
||||
}
|
||||
@@ -1158,6 +1207,7 @@ impl LayerInner {
|
||||
permit: heavier_once_cell::InitPermit,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<DownloadedLayer>, remote_storage::DownloadError> {
|
||||
let start = std::time::Instant::now();
|
||||
let result = timeline
|
||||
.remote_client
|
||||
.download_layer_file(
|
||||
@@ -1169,7 +1219,8 @@ impl LayerInner {
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
|
||||
let latency = start.elapsed();
|
||||
let latency_millis = u64::try_from(latency.as_millis()).unwrap();
|
||||
match result {
|
||||
Ok(size) => {
|
||||
assert_eq!(size, self.desc.file_size);
|
||||
@@ -1185,9 +1236,8 @@ impl LayerInner {
|
||||
Err(e) => {
|
||||
panic!("post-condition failed: needs_download errored: {e:?}");
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(size=%self.desc.file_size, "on-demand download successful");
|
||||
};
|
||||
tracing::info!(size=%self.desc.file_size, %latency_millis, "on-demand download successful");
|
||||
timeline
|
||||
.metrics
|
||||
.resident_physical_size_add(self.desc.file_size);
|
||||
@@ -1216,7 +1266,7 @@ impl LayerInner {
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
|
||||
tracing::error!(consecutive_failures, %latency_millis, "layer file download failed: {e:#}");
|
||||
|
||||
let backoff = utils::backoff::exponential_backoff_duration_seconds(
|
||||
consecutive_failures.min(u32::MAX as usize) as u32,
|
||||
@@ -1720,9 +1770,9 @@ impl DownloadedLayer {
|
||||
);
|
||||
|
||||
let res = if owner.desc.is_delta {
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(crate::context::PageContentKind::DeltaLayerSummary)
|
||||
.build();
|
||||
.attached_child();
|
||||
let summary = Some(delta_layer::Summary::expected(
|
||||
owner.desc.tenant_shard_id.tenant_id,
|
||||
owner.desc.timeline_id,
|
||||
@@ -1738,9 +1788,9 @@ impl DownloadedLayer {
|
||||
.await
|
||||
.map(LayerKind::Delta)
|
||||
} else {
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(crate::context::PageContentKind::ImageLayerSummary)
|
||||
.build();
|
||||
.attached_child();
|
||||
let lsn = owner.desc.image_layer_lsn();
|
||||
let summary = Some(image_layer::Summary::expected(
|
||||
owner.desc.tenant_shard_id.tenant_id,
|
||||
|
||||
@@ -119,6 +119,10 @@ async fn smoke_test() {
|
||||
let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
|
||||
assert!(matches!(e, EvictionError::NotFound));
|
||||
|
||||
let dl_ctx = RequestContextBuilder::from(ctx)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.attached_child();
|
||||
|
||||
// on accesses when the layer is evicted, it will automatically be downloaded.
|
||||
let img_after = {
|
||||
let mut data = ValuesReconstructState::new(io_concurrency.clone());
|
||||
@@ -127,7 +131,7 @@ async fn smoke_test() {
|
||||
controlfile_keyspace.clone(),
|
||||
Lsn(0x10)..Lsn(0x11),
|
||||
&mut data,
|
||||
ctx,
|
||||
&dl_ctx,
|
||||
)
|
||||
.instrument(download_span.clone())
|
||||
.await
|
||||
@@ -177,7 +181,7 @@ async fn smoke_test() {
|
||||
|
||||
// plain downloading is rarely needed
|
||||
layer
|
||||
.download_and_keep_resident(ctx)
|
||||
.download_and_keep_resident(&dl_ctx)
|
||||
.instrument(download_span)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -645,9 +649,10 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
|
||||
let ctx = ctx.with_scope_timeline(&timeline);
|
||||
|
||||
// This test does downloads
|
||||
let ctx = RequestContextBuilder::extend(&ctx)
|
||||
let ctx = RequestContextBuilder::from(&ctx)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
let layer = {
|
||||
let mut layers = {
|
||||
let layers = timeline.layers.read().await;
|
||||
@@ -730,9 +735,9 @@ async fn evict_and_wait_does_not_wait_for_download() {
|
||||
let ctx = ctx.with_scope_timeline(&timeline);
|
||||
|
||||
// This test does downloads
|
||||
let ctx = RequestContextBuilder::extend(&ctx)
|
||||
let ctx = RequestContextBuilder::from(&ctx)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
let layer = {
|
||||
let mut layers = {
|
||||
|
||||
@@ -23,6 +23,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
|
||||
use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use anyhow::{Context, Result, anyhow, bail, ensure};
|
||||
use arc_swap::{ArcSwap, ArcSwapOption};
|
||||
use bytes::Bytes;
|
||||
@@ -96,7 +97,9 @@ use super::{
|
||||
};
|
||||
use crate::aux_file::AuxFileSizeEstimator;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::context::{
|
||||
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate, finite_f32};
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::l0_flush::{self, L0FlushGlobalState};
|
||||
@@ -895,6 +898,12 @@ pub(crate) struct CompactRequest {
|
||||
pub sub_compaction_max_job_size_mb: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Deserialize)]
|
||||
pub(crate) struct MarkInvisibleRequest {
|
||||
#[serde(default)]
|
||||
pub is_visible: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(crate) struct CompactOptions {
|
||||
pub flags: EnumSet<CompactFlags>,
|
||||
@@ -1283,9 +1292,22 @@ impl Timeline {
|
||||
};
|
||||
reconstruct_state.read_path = read_path;
|
||||
|
||||
let traversal_res: Result<(), _> = self
|
||||
.get_vectored_reconstruct_data(keyspace.clone(), lsn, reconstruct_state, ctx)
|
||||
.await;
|
||||
let traversal_res: Result<(), _> = {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_IO",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
self.get_vectored_reconstruct_data(keyspace.clone(), lsn, reconstruct_state, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await
|
||||
};
|
||||
|
||||
if let Err(err) = traversal_res {
|
||||
// Wait for all the spawned IOs to complete.
|
||||
// See comments on `spawn_io` inside `storage_layer` for more details.
|
||||
@@ -1299,14 +1321,46 @@ impl Timeline {
|
||||
|
||||
let layers_visited = reconstruct_state.get_layers_visited();
|
||||
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"RECONSTRUCT",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
let futs = FuturesUnordered::new();
|
||||
for (key, state) in std::mem::take(&mut reconstruct_state.keys) {
|
||||
futs.push({
|
||||
let walredo_self = self.myself.upgrade().expect("&self method holds the arc");
|
||||
let ctx = RequestContextBuilder::from(&ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"RECONSTRUCT_KEY",
|
||||
key = %key,
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
async move {
|
||||
assert_eq!(state.situation, ValueReconstructSituation::Complete);
|
||||
|
||||
let converted = match state.collect_pending_ios().await {
|
||||
let res = state
|
||||
.collect_pending_ios()
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"WAIT_FOR_IO_COMPLETIONS",
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
||||
let converted = match res {
|
||||
Ok(ok) => ok,
|
||||
Err(err) => {
|
||||
return (key, Err(err));
|
||||
@@ -1323,16 +1377,27 @@ impl Timeline {
|
||||
"{converted:?}"
|
||||
);
|
||||
|
||||
(
|
||||
key,
|
||||
walredo_self.reconstruct_value(key, lsn, converted).await,
|
||||
)
|
||||
let walredo_deltas = converted.num_deltas();
|
||||
let walredo_res = walredo_self
|
||||
.reconstruct_value(key, lsn, converted)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"WALREDO",
|
||||
deltas = %walredo_deltas,
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
||||
(key, walredo_res)
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let results = futs
|
||||
.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>()
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await;
|
||||
|
||||
// For aux file keys (v1 or v2) the vectored read path does not return an error
|
||||
@@ -2241,7 +2306,7 @@ impl Timeline {
|
||||
.await
|
||||
.expect("holding a reference to self");
|
||||
}
|
||||
TimelineState::Active { .. } => {
|
||||
TimelineState::Active => {
|
||||
return Ok(());
|
||||
}
|
||||
TimelineState::Broken { .. } | TimelineState::Stopping => {
|
||||
@@ -2411,6 +2476,31 @@ impl Timeline {
|
||||
.unwrap_or(self.conf.default_tenant_conf.lazy_slru_download)
|
||||
}
|
||||
|
||||
/// Checks if a get page request should get perf tracing
|
||||
///
|
||||
/// The configuration priority is: tenant config override, default tenant config,
|
||||
/// pageserver config.
|
||||
pub(crate) fn is_get_page_request_sampled(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
let ratio = tenant_conf
|
||||
.tenant_conf
|
||||
.sampling_ratio
|
||||
.flatten()
|
||||
.or(self.conf.default_tenant_conf.sampling_ratio)
|
||||
.or(self.conf.tracing.as_ref().map(|t| t.sampling_ratio));
|
||||
|
||||
match ratio {
|
||||
Some(r) => {
|
||||
if r.numerator == 0 {
|
||||
false
|
||||
} else {
|
||||
rand::thread_rng().gen_range(0..r.denominator) < r.numerator
|
||||
}
|
||||
}
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn get_checkpoint_distance(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
@@ -3869,15 +3959,30 @@ impl Timeline {
|
||||
let TimelineVisitOutcome {
|
||||
completed_keyspace: completed,
|
||||
image_covered_keyspace,
|
||||
} = Self::get_vectored_reconstruct_data_timeline(
|
||||
timeline,
|
||||
keyspace.clone(),
|
||||
cont_lsn,
|
||||
reconstruct_state,
|
||||
&self.cancel,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
} = {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_IO_TIMELINE",
|
||||
timeline = %timeline.timeline_id,
|
||||
lsn = %cont_lsn,
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
Self::get_vectored_reconstruct_data_timeline(
|
||||
timeline,
|
||||
keyspace.clone(),
|
||||
cont_lsn,
|
||||
reconstruct_state,
|
||||
&self.cancel,
|
||||
&ctx,
|
||||
)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await?
|
||||
};
|
||||
|
||||
keyspace.remove_overlapping_with(&completed);
|
||||
|
||||
@@ -3921,8 +4026,24 @@ impl Timeline {
|
||||
|
||||
// Take the min to avoid reconstructing a page with data newer than request Lsn.
|
||||
cont_lsn = std::cmp::min(Lsn(request_lsn.0 + 1), Lsn(timeline.ancestor_lsn.0 + 1));
|
||||
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"GET_ANCESTOR",
|
||||
timeline = %timeline.timeline_id,
|
||||
lsn = %cont_lsn,
|
||||
ancestor = %ancestor_timeline.timeline_id,
|
||||
ancestor_lsn = %timeline.ancestor_lsn
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
timeline_owned = timeline
|
||||
.get_ready_ancestor_timeline(ancestor_timeline, ctx)
|
||||
.get_ready_ancestor_timeline(ancestor_timeline, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await?;
|
||||
timeline = &*timeline_owned;
|
||||
};
|
||||
@@ -7253,9 +7374,9 @@ mod tests {
|
||||
|
||||
eprintln!("Downloading {layer} and re-generating heatmap");
|
||||
|
||||
let ctx = &RequestContextBuilder::extend(ctx)
|
||||
let ctx = &RequestContextBuilder::from(ctx)
|
||||
.download_behavior(crate::context::DownloadBehavior::Download)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
let _resident = layer
|
||||
.download_and_keep_resident(ctx)
|
||||
|
||||
@@ -26,7 +26,7 @@ use once_cell::sync::Lazy;
|
||||
use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
|
||||
use pageserver_api::key::{KEY_SIZE, Key};
|
||||
use pageserver_api::keyspace::{KeySpace, ShardedRange};
|
||||
use pageserver_api::models::CompactInfoResponse;
|
||||
use pageserver_api::models::{CompactInfoResponse, CompactKeyRange};
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
|
||||
use pageserver_api::value::Value;
|
||||
@@ -61,7 +61,7 @@ use crate::tenant::timeline::{
|
||||
DeltaLayerWriter, ImageLayerCreationOutcome, ImageLayerWriter, IoConcurrency, Layer,
|
||||
ResidentLayer, drop_rlock,
|
||||
};
|
||||
use crate::tenant::{DeltaLayer, MaybeOffloaded, gc_block};
|
||||
use crate::tenant::{DeltaLayer, MaybeOffloaded};
|
||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||
|
||||
/// Maximum number of deltas before generating an image layer in bottom-most compaction.
|
||||
@@ -123,7 +123,6 @@ impl GcCompactionQueueItem {
|
||||
#[derive(Default)]
|
||||
struct GcCompactionGuardItems {
|
||||
notify: Option<tokio::sync::oneshot::Sender<()>>,
|
||||
gc_guard: Option<gc_block::Guard>,
|
||||
permit: Option<OwnedSemaphorePermit>,
|
||||
}
|
||||
|
||||
@@ -279,7 +278,7 @@ impl GcCompactionQueue {
|
||||
gc_compaction_ratio_percent: u64,
|
||||
) -> bool {
|
||||
const AUTO_TRIGGER_LIMIT: u64 = 150 * 1024 * 1024 * 1024; // 150GB
|
||||
if l1_size >= AUTO_TRIGGER_LIMIT || l2_size >= AUTO_TRIGGER_LIMIT {
|
||||
if l1_size + l2_size >= AUTO_TRIGGER_LIMIT {
|
||||
// Do not auto-trigger when physical size >= 150GB
|
||||
return false;
|
||||
}
|
||||
@@ -319,7 +318,12 @@ impl GcCompactionQueue {
|
||||
flags
|
||||
},
|
||||
sub_compaction: true,
|
||||
compact_key_range: None,
|
||||
// Only auto-trigger gc-compaction over the data keyspace due to concerns in
|
||||
// https://github.com/neondatabase/neon/issues/11318.
|
||||
compact_key_range: Some(CompactKeyRange {
|
||||
start: Key::MIN,
|
||||
end: Key::metadata_key_range().start,
|
||||
}),
|
||||
compact_lsn_range: None,
|
||||
sub_compaction_max_job_size_mb: None,
|
||||
},
|
||||
@@ -343,44 +347,45 @@ impl GcCompactionQueue {
|
||||
info!("compaction job id={} finished", id);
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
if let Some(items) = guard.guards.remove(&id) {
|
||||
drop(items.gc_guard);
|
||||
if let Some(tx) = items.notify {
|
||||
let _ = tx.send(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn clear_running_job(&self) {
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.running = None;
|
||||
}
|
||||
|
||||
async fn handle_sub_compaction(
|
||||
&self,
|
||||
id: GcCompactionJobId,
|
||||
options: CompactOptions,
|
||||
timeline: &Arc<Timeline>,
|
||||
gc_block: &GcBlock,
|
||||
auto: bool,
|
||||
) -> Result<(), CompactionError> {
|
||||
info!(
|
||||
"running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
|
||||
);
|
||||
let jobs = timeline
|
||||
let res = timeline
|
||||
.gc_compaction_split_jobs(
|
||||
GcCompactJob::from_compact_options(options.clone()),
|
||||
options.sub_compaction_max_job_size_mb,
|
||||
)
|
||||
.await?;
|
||||
.await;
|
||||
let jobs = match res {
|
||||
Ok(jobs) => jobs,
|
||||
Err(err) => {
|
||||
warn!("cannot split gc-compaction jobs: {}, unblocked gc", err);
|
||||
self.notify_and_unblock(id);
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
if jobs.is_empty() {
|
||||
info!("no jobs to run, skipping scheduled compaction task");
|
||||
self.notify_and_unblock(id);
|
||||
} else {
|
||||
let gc_guard = match gc_block.start().await {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"cannot run gc-compaction because gc is blocked: {}",
|
||||
e
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let jobs_len = jobs.len();
|
||||
let mut pending_tasks = Vec::new();
|
||||
// gc-compaction might pick more layers or fewer layers to compact. The L2 LSN does not need to be accurate.
|
||||
@@ -415,7 +420,6 @@ impl GcCompactionQueue {
|
||||
|
||||
{
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.guards.entry(id).or_default().gc_guard = Some(gc_guard);
|
||||
let mut tasks = Vec::new();
|
||||
for task in pending_tasks {
|
||||
let id = guard.next_id();
|
||||
@@ -446,7 +450,18 @@ impl GcCompactionQueue {
|
||||
if let Err(err) = &res {
|
||||
log_compaction_error(err, None, cancel.is_cancelled());
|
||||
}
|
||||
res
|
||||
match res {
|
||||
Ok(res) => Ok(res),
|
||||
Err(CompactionError::ShuttingDown) => Err(CompactionError::ShuttingDown),
|
||||
Err(_) => {
|
||||
// There are some cases where traditional gc might collect some layer
|
||||
// files causing gc-compaction cannot read the full history of the key.
|
||||
// This needs to be resolved in the long-term by improving the compaction
|
||||
// process. For now, let's simply avoid such errors triggering the
|
||||
// circuit breaker.
|
||||
Ok(CompactionOutcome::Skipped)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn iteration_inner(
|
||||
@@ -494,27 +509,32 @@ impl GcCompactionQueue {
|
||||
info!(
|
||||
"running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
|
||||
);
|
||||
self.handle_sub_compaction(id, options, timeline, gc_block, auto)
|
||||
self.handle_sub_compaction(id, options, timeline, auto)
|
||||
.await?;
|
||||
} else {
|
||||
// Auto compaction always enables sub-compaction so we don't need to handle update_l2_lsn
|
||||
// in this branch.
|
||||
let gc_guard = match gc_block.start().await {
|
||||
let _gc_guard = match gc_block.start().await {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
self.notify_and_unblock(id);
|
||||
self.clear_running_job();
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"cannot run gc-compaction because gc is blocked: {}",
|
||||
e
|
||||
)));
|
||||
}
|
||||
};
|
||||
{
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.guards.entry(id).or_default().gc_guard = Some(gc_guard);
|
||||
}
|
||||
let compaction_result =
|
||||
timeline.compact_with_options(cancel, options, ctx).await?;
|
||||
self.notify_and_unblock(id);
|
||||
let res = timeline.compact_with_options(cancel, options, ctx).await;
|
||||
let compaction_result = match res {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
warn!(%err, "failed to run gc-compaction");
|
||||
self.notify_and_unblock(id);
|
||||
self.clear_running_job();
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
if compaction_result == CompactionOutcome::YieldForL0 {
|
||||
yield_for_l0 = true;
|
||||
}
|
||||
@@ -522,7 +542,25 @@ impl GcCompactionQueue {
|
||||
}
|
||||
GcCompactionQueueItem::SubCompactionJob(options) => {
|
||||
// TODO: error handling, clear the queue if any task fails?
|
||||
let compaction_result = timeline.compact_with_options(cancel, options, ctx).await?;
|
||||
let _gc_guard = match gc_block.start().await {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
self.clear_running_job();
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"cannot run gc-compaction because gc is blocked: {}",
|
||||
e
|
||||
)));
|
||||
}
|
||||
};
|
||||
let res = timeline.compact_with_options(cancel, options, ctx).await;
|
||||
let compaction_result = match res {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
warn!(%err, "failed to run gc-compaction subcompaction job");
|
||||
self.clear_running_job();
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
if compaction_result == CompactionOutcome::YieldForL0 {
|
||||
// We will permenantly give up a task if we yield for L0 compaction: the preempted subcompaction job won't be running
|
||||
// again. This ensures that we don't keep doing duplicated work within gc-compaction. Not directly returning here because
|
||||
@@ -553,10 +591,7 @@ impl GcCompactionQueue {
|
||||
}
|
||||
}
|
||||
}
|
||||
{
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.running = None;
|
||||
}
|
||||
self.clear_running_job();
|
||||
Ok(if yield_for_l0 {
|
||||
tracing::info!("give up gc-compaction: yield for L0 compaction");
|
||||
CompactionOutcome::YieldForL0
|
||||
@@ -1001,9 +1036,9 @@ impl Timeline {
|
||||
{
|
||||
Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
|
||||
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
|
||||
let image_ctx = RequestContextBuilder::extend(ctx)
|
||||
let image_ctx = RequestContextBuilder::from(ctx)
|
||||
.access_stats_behavior(AccessStatsBehavior::Skip)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
let mut partitioning = dense_partitioning;
|
||||
partitioning
|
||||
@@ -1209,6 +1244,10 @@ impl Timeline {
|
||||
let mut replace_image_layers = Vec::new();
|
||||
|
||||
for layer in layers_to_rewrite {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
}
|
||||
|
||||
tracing::info!(layer=%layer, "Rewriting layer after shard split...");
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
|
||||
@@ -2,10 +2,14 @@ use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use bytes::Bytes;
|
||||
use http_utils::error::ApiError;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::DetachBehavior;
|
||||
use pageserver_api::models::detach_ancestor::AncestorDetached;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use pageserver_compaction::helpers::overlaps_with;
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Instrument;
|
||||
@@ -22,7 +26,10 @@ use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::Tenant;
|
||||
use crate::tenant::remote_timeline_client::index::GcBlockingReason::DetachAncestor;
|
||||
use crate::tenant::storage_layer::layer::local_layer_path;
|
||||
use crate::tenant::storage_layer::{AsLayerDesc as _, DeltaLayerWriter, Layer, ResidentLayer};
|
||||
use crate::tenant::storage_layer::{
|
||||
AsLayerDesc as _, DeltaLayerWriter, ImageLayerWriter, IoConcurrency, Layer, ResidentLayer,
|
||||
ValuesReconstructState,
|
||||
};
|
||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -170,6 +177,92 @@ impl Attempt {
|
||||
}
|
||||
}
|
||||
|
||||
async fn generate_tombstone_image_layer(
|
||||
detached: &Arc<Timeline>,
|
||||
ancestor: &Arc<Timeline>,
|
||||
ancestor_lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Option<ResidentLayer>, Error> {
|
||||
tracing::info!(
|
||||
"removing non-inherited keys by writing an image layer with tombstones at the detach LSN"
|
||||
);
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
detached.conf,
|
||||
detached.gate.enter().map_err(|_| Error::ShuttingDown)?,
|
||||
);
|
||||
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
|
||||
// Directly use `get_vectored_impl` to skip the max_vectored_read_key limit check. Note that the keyspace should
|
||||
// not contain too many keys, otherwise this takes a lot of memory. Currently we limit it to 10k keys in the compute.
|
||||
let key_range = Key::sparse_non_inherited_keyspace();
|
||||
// avoid generating a "future layer" which will then be removed
|
||||
let image_lsn = ancestor_lsn;
|
||||
|
||||
{
|
||||
let layers = detached.layers.read().await;
|
||||
for layer in layers.all_persistent_layers() {
|
||||
if !layer.is_delta
|
||||
&& layer.lsn_range.start == image_lsn
|
||||
&& overlaps_with(&key_range, &layer.key_range)
|
||||
{
|
||||
tracing::warn!(
|
||||
layer=%layer, "image layer at the detach LSN already exists, skipping removing aux files"
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let data = ancestor
|
||||
.get_vectored_impl(
|
||||
KeySpace::single(key_range.clone()),
|
||||
image_lsn,
|
||||
&mut reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.context("failed to retrieve aux keys")
|
||||
.map_err(|e| Error::launder(e, Error::Prepare))?;
|
||||
if !data.is_empty() {
|
||||
// TODO: is it possible that we can have an image at `image_lsn`? Unlikely because image layers are only generated
|
||||
// upon compaction but theoretically possible.
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
detached.conf,
|
||||
detached.timeline_id,
|
||||
detached.tenant_shard_id,
|
||||
&key_range,
|
||||
image_lsn,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.context("failed to create image layer writer")
|
||||
.map_err(Error::Prepare)?;
|
||||
for key in data.keys() {
|
||||
image_layer_writer
|
||||
.put_image(*key, Bytes::new(), ctx)
|
||||
.await
|
||||
.context("failed to write key")
|
||||
.map_err(|e| Error::launder(e, Error::Prepare))?;
|
||||
}
|
||||
let (desc, path) = image_layer_writer
|
||||
.finish(ctx)
|
||||
.await
|
||||
.context("failed to finish image layer writer for removing the metadata keys")
|
||||
.map_err(|e| Error::launder(e, Error::Prepare))?;
|
||||
let generated = Layer::finish_creating(detached.conf, detached, desc, &path)
|
||||
.map_err(|e| Error::launder(e, Error::Prepare))?;
|
||||
detached
|
||||
.remote_client
|
||||
.upload_layer_file(&generated, &detached.cancel)
|
||||
.await
|
||||
.map_err(|e| Error::launder(e, Error::Prepare))?;
|
||||
tracing::info!(layer=%generated, "wrote image layer");
|
||||
Ok(Some(generated))
|
||||
} else {
|
||||
tracing::info!("no aux keys found in ancestor");
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// See [`Timeline::prepare_to_detach_from_ancestor`]
|
||||
pub(super) async fn prepare(
|
||||
detached: &Arc<Timeline>,
|
||||
@@ -352,10 +445,16 @@ pub(super) async fn prepare(
|
||||
|
||||
// TODO: copying and lsn prefix copying could be done at the same time with a single fsync after
|
||||
let mut new_layers: Vec<Layer> =
|
||||
Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len());
|
||||
Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len() + 1);
|
||||
|
||||
if let Some(tombstone_layer) =
|
||||
generate_tombstone_image_layer(detached, &ancestor, ancestor_lsn, ctx).await?
|
||||
{
|
||||
new_layers.push(tombstone_layer.into());
|
||||
}
|
||||
|
||||
{
|
||||
tracing::debug!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
|
||||
tracing::info!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
|
||||
|
||||
let mut tasks = tokio::task::JoinSet::new();
|
||||
|
||||
|
||||
@@ -32,9 +32,15 @@ impl Client {
|
||||
let Some(ref base_url) = conf.import_pgdata_upcall_api else {
|
||||
anyhow::bail!("import_pgdata_upcall_api is not configured")
|
||||
};
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
for cert in &conf.ssl_ca_certs {
|
||||
http_client = http_client.add_root_certificate(cert.clone());
|
||||
}
|
||||
let http_client = http_client.build()?;
|
||||
|
||||
Ok(Self {
|
||||
base_url: base_url.to_string(),
|
||||
client: reqwest::Client::new(),
|
||||
client: http_client,
|
||||
cancel,
|
||||
authorization_header: conf
|
||||
.import_pgdata_upcall_api_token
|
||||
|
||||
@@ -25,8 +25,8 @@ impl<const A: usize> AlignedBufferMut<ConstAlign<A>> {
|
||||
/// * `align` must be a power of two,
|
||||
///
|
||||
/// * `capacity`, when rounded up to the nearest multiple of `align`,
|
||||
/// must not overflow isize (i.e., the rounded value must be
|
||||
/// less than or equal to `isize::MAX`).
|
||||
/// must not overflow isize (i.e., the rounded value must be
|
||||
/// less than or equal to `isize::MAX`).
|
||||
pub fn with_capacity(capacity: usize) -> Self {
|
||||
AlignedBufferMut {
|
||||
raw: RawAlignedBuffer::with_capacity(capacity),
|
||||
|
||||
@@ -37,8 +37,8 @@ impl<const A: usize> RawAlignedBuffer<ConstAlign<A>> {
|
||||
/// * `align` must be a power of two,
|
||||
///
|
||||
/// * `capacity`, when rounded up to the nearest multiple of `align`,
|
||||
/// must not overflow isize (i.e., the rounded value must be
|
||||
/// less than or equal to `isize::MAX`).
|
||||
/// must not overflow isize (i.e., the rounded value must be
|
||||
/// less than or equal to `isize::MAX`).
|
||||
pub fn with_capacity(capacity: usize) -> Self {
|
||||
let align = ConstAlign::<A>;
|
||||
let layout = Layout::from_size_align(capacity, align.align()).expect("Invalid layout");
|
||||
|
||||
@@ -9,4 +9,4 @@
|
||||
#define BITMAP_SET(bm, bit) (bm)[(bit) >> 3] |= (1 << ((bit) & 7))
|
||||
#define BITMAP_CLR(bm, bit) (bm)[(bit) >> 3] &= ~(1 << ((bit) & 7))
|
||||
|
||||
#endif //NEON_BITMAP_H
|
||||
#endif /* NEON_BITMAP_H */
|
||||
|
||||
@@ -13,9 +13,6 @@
|
||||
* accumulate changes. On subtransaction commit, the top of the stack
|
||||
* is merged with the table below it.
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* contrib/neon/control_plane_connector.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
|
||||
@@ -3,9 +3,6 @@
|
||||
* extension_server.c
|
||||
* Request compute_ctl to download extension files.
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* contrib/neon/extension_server.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
@@ -3,9 +3,6 @@
|
||||
* extension_server.h
|
||||
* Request compute_ctl to download extension files.
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* contrib/neon/extension_server.h
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
/*
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* file_cache.c
|
||||
*
|
||||
@@ -6,10 +6,6 @@
|
||||
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* pgxn/neon/file_cache.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
|
||||
@@ -6,10 +6,6 @@
|
||||
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* contrib/neon/libpqpagestore.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
@@ -34,6 +30,7 @@
|
||||
#include "storage/lwlock.h"
|
||||
#include "storage/pg_shmem.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/memutils.h"
|
||||
|
||||
#include "neon.h"
|
||||
#include "neon_perf_counters.h"
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
#include "postgres.h"
|
||||
|
||||
#include <dirent.h>
|
||||
#include <limits.h>
|
||||
#include <string.h>
|
||||
#include <signal.h>
|
||||
#include <sys/stat.h>
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "miscadmin.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
|
||||
@@ -1,10 +1,7 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* neon.c
|
||||
* Utility functions to expose neon specific information to user
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* contrib/neon/neon.c
|
||||
* Main entry point into the neon exension
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
@@ -3,15 +3,13 @@
|
||||
* neon.h
|
||||
* Functions used in the initialization of this extension.
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* contrib/neon/neon.h
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef NEON_H
|
||||
#define NEON_H
|
||||
#include "access/xlogreader.h"
|
||||
|
||||
#include "access/xlogdefs.h"
|
||||
#include "utils/wait_event.h"
|
||||
|
||||
/* GUCs */
|
||||
@@ -58,8 +56,8 @@ extern void SetNeonCurrentClusterSize(uint64 size);
|
||||
extern uint64 GetNeonCurrentClusterSize(void);
|
||||
extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
|
||||
|
||||
extern void PGDLLEXPORT WalProposerSync(int argc, char *argv[]);
|
||||
extern void PGDLLEXPORT WalProposerMain(Datum main_arg);
|
||||
PGDLLEXPORT void LogicalSlotsMonitorMain(Datum main_arg);
|
||||
extern PGDLLEXPORT void WalProposerSync(int argc, char *argv[]);
|
||||
extern PGDLLEXPORT void WalProposerMain(Datum main_arg);
|
||||
extern PGDLLEXPORT void LogicalSlotsMonitorMain(Datum main_arg);
|
||||
|
||||
#endif /* NEON_H */
|
||||
|
||||
@@ -12,8 +12,8 @@
|
||||
#include "storage/procnumber.h"
|
||||
#else
|
||||
#include "storage/backendid.h"
|
||||
#include "storage/proc.h"
|
||||
#endif
|
||||
#include "storage/proc.h"
|
||||
|
||||
static const uint64 io_wait_bucket_thresholds[] = {
|
||||
2, 3, 6, 10, /* 0 us - 10 us */
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
#include "access/xlogreader.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "storage/fd.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/wait_event.h"
|
||||
|
||||
#include "libpq-fe.h"
|
||||
|
||||
@@ -8,8 +8,8 @@
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef pageserver_h
|
||||
#define pageserver_h
|
||||
#ifndef PAGESTORE_CLIENT_h
|
||||
#define PAGESTORE_CLIENT_h
|
||||
|
||||
#include "neon_pgversioncompat.h"
|
||||
|
||||
@@ -17,11 +17,8 @@
|
||||
#include "access/xlogdefs.h"
|
||||
#include RELFILEINFO_HDR
|
||||
#include "lib/stringinfo.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "storage/block.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "utils/memutils.h"
|
||||
|
||||
#define MAX_SHARDS 128
|
||||
#define MAX_PAGESERVER_CONNSTRING_SIZE 256
|
||||
@@ -277,13 +274,8 @@ typedef struct
|
||||
XLogRecPtr effective_request_lsn;
|
||||
} neon_request_lsns;
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
neon_request_lsns request_lsns, char *buffer);
|
||||
#else
|
||||
extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
neon_request_lsns request_lsns, void *buffer);
|
||||
#endif
|
||||
extern int64 neon_dbsize(Oid dbNode);
|
||||
|
||||
/* utils for neon relsize cache */
|
||||
@@ -326,4 +318,4 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
return lfc_writev(rinfo, forkNum, blkno, &buffer, 1);
|
||||
}
|
||||
|
||||
#endif
|
||||
#endif /* PAGESTORE_CLIENT_H */
|
||||
|
||||
@@ -37,10 +37,6 @@
|
||||
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* contrib/neon/pagestore_smgr.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
@@ -55,6 +51,7 @@
|
||||
#include "catalog/pg_class.h"
|
||||
#include "common/hashfn.h"
|
||||
#include "executor/instrument.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "pgstat.h"
|
||||
#include "postmaster/autovacuum.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
@@ -1903,7 +1900,6 @@ neon_wallog_pagev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
log_pages = true;
|
||||
}
|
||||
else if (XLogInsertAllowed() &&
|
||||
!ShutdownRequestPending &&
|
||||
(forknum == FSM_FORKNUM || forknum == VISIBILITYMAP_FORKNUM))
|
||||
{
|
||||
log_pages = true;
|
||||
@@ -3157,13 +3153,8 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
|
||||
* The offsets in request_lsns, buffers, and mask are linked.
|
||||
*/
|
||||
static void
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_blockno, neon_request_lsns *request_lsns,
|
||||
char **buffers, BlockNumber nblocks, const bits8 *mask)
|
||||
#else
|
||||
neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_blockno, neon_request_lsns *request_lsns,
|
||||
void **buffers, BlockNumber nblocks, const bits8 *mask)
|
||||
#endif
|
||||
{
|
||||
NeonResponse *resp;
|
||||
uint64 ring_index;
|
||||
@@ -3359,13 +3350,8 @@ Retry:
|
||||
* To avoid breaking tests in the runtime please keep function signature in sync.
|
||||
*/
|
||||
void
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
neon_request_lsns request_lsns, char *buffer)
|
||||
#else
|
||||
neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
neon_request_lsns request_lsns, void *buffer)
|
||||
#endif
|
||||
{
|
||||
neon_read_at_lsnv(rinfo, forkNum, blkno, &request_lsns, &buffer, 1, NULL);
|
||||
}
|
||||
|
||||
@@ -6,10 +6,6 @@
|
||||
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* contrib/neon/relsize_cache.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
|
||||
#include <stdio.h>
|
||||
|
||||
#include "libpq/pqformat.h"
|
||||
#include "miscadmin.h"
|
||||
#include "utils/datetime.h"
|
||||
#include "walproposer.h"
|
||||
|
||||
@@ -50,13 +50,8 @@ PG_FUNCTION_INFO_V1(trigger_segfault);
|
||||
* Linkage to functions in neon module.
|
||||
* The signature here would need to be updated whenever function parameters change in pagestore_smgr.c
|
||||
*/
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
typedef void (*neon_read_at_lsn_type) (NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
neon_request_lsns request_lsns, char *buffer);
|
||||
#else
|
||||
typedef void (*neon_read_at_lsn_type) (NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
neon_request_lsns request_lsns, void *buffer);
|
||||
#endif
|
||||
|
||||
static neon_read_at_lsn_type neon_read_at_lsn_ptr;
|
||||
|
||||
|
||||
18
poetry.lock
generated
18
poetry.lock
generated
@@ -1,4 +1,4 @@
|
||||
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "aiohappyeyeballs"
|
||||
@@ -1286,24 +1286,20 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "h2"
|
||||
version = "4.1.0"
|
||||
version = "4.2.0"
|
||||
description = "Pure-Python HTTP/2 protocol implementation"
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
files = []
|
||||
develop = false
|
||||
files = [
|
||||
{file = "h2-4.2.0-py3-none-any.whl", hash = "sha256:479a53ad425bb29af087f3458a61d30780bc818e4ebcf01f0b536ba916462ed0"},
|
||||
{file = "h2-4.2.0.tar.gz", hash = "sha256:c8a52129695e88b1a0578d8d2cc6842bbd79128ac685463b887ee278126ad01f"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
hpack = ">=4.1,<5"
|
||||
hyperframe = ">=6.1,<7"
|
||||
|
||||
[package.source]
|
||||
type = "git"
|
||||
url = "https://github.com/python-hyper/h2"
|
||||
reference = "HEAD"
|
||||
resolved_reference = "0b98b244b5fd1fe96100ac14905417a3b70a4286"
|
||||
|
||||
[[package]]
|
||||
name = "hpack"
|
||||
version = "4.1.0"
|
||||
@@ -3844,4 +3840,4 @@ cffi = ["cffi (>=1.11)"]
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = "^3.11"
|
||||
content-hash = "fb50cb6b291169dce3188560cdb31a14af95647318f8f0f0d718131dbaf1817a"
|
||||
content-hash = "7ab1e7b975af34b3271b7c6018fa22a261d3f73c7c0a0403b6b2bb86b5fbd36e"
|
||||
|
||||
@@ -41,7 +41,7 @@ use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
|
||||
use crate::metrics::Metrics;
|
||||
|
||||
pub(crate) const EXT_NAME: &str = "pg_session_jwt";
|
||||
pub(crate) const EXT_VERSION: &str = "0.2.0";
|
||||
pub(crate) const EXT_VERSION: &str = "0.3.0";
|
||||
pub(crate) const EXT_SCHEMA: &str = "auth";
|
||||
|
||||
#[derive(Clone)]
|
||||
|
||||
@@ -43,7 +43,7 @@ websockets = "^12.0"
|
||||
clickhouse-connect = "^0.7.16"
|
||||
kafka-python = "^2.0.2"
|
||||
jwcrypto = "^1.5.6"
|
||||
h2 = {git = "https://github.com/python-hyper/h2"}
|
||||
h2 = "^4.2.0"
|
||||
types-jwcrypto = "^1.5.0.20240925"
|
||||
pyyaml = "^6.0.2"
|
||||
types-pyyaml = "^6.0.12.20240917"
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[toolchain]
|
||||
channel = "1.85.0"
|
||||
channel = "1.86.0"
|
||||
profile = "default"
|
||||
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
|
||||
# https://rust-lang.github.io/rustup/concepts/profiles.html
|
||||
|
||||
@@ -115,13 +115,17 @@ impl Client {
|
||||
"{}/v1/tenant/{}/timeline/{}",
|
||||
self.mgmt_api_endpoint, tenant_id, timeline_id
|
||||
);
|
||||
let resp = self.request(Method::DELETE, &uri, ()).await?;
|
||||
let resp = self
|
||||
.request_maybe_body(Method::DELETE, &uri, None::<()>)
|
||||
.await?;
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TimelineDeleteResult> {
|
||||
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TenantDeleteResult> {
|
||||
let uri = format!("{}/v1/tenant/{}", self.mgmt_api_endpoint, tenant_id);
|
||||
let resp = self.request(Method::DELETE, &uri, ()).await?;
|
||||
let resp = self
|
||||
.request_maybe_body(Method::DELETE, &uri, None::<()>)
|
||||
.await?;
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
@@ -197,6 +201,16 @@ impl Client {
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: B,
|
||||
) -> Result<reqwest::Response> {
|
||||
self.request_maybe_body(method, uri, Some(body)).await
|
||||
}
|
||||
|
||||
/// Send the request and check that the status code is good, with an optional body.
|
||||
async fn request_maybe_body<B: serde::Serialize, U: reqwest::IntoUrl>(
|
||||
&self,
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: Option<B>,
|
||||
) -> Result<reqwest::Response> {
|
||||
let res = self.request_noerror(method, uri, body).await?;
|
||||
let response = res.error_from_body().await?;
|
||||
@@ -208,12 +222,15 @@ impl Client {
|
||||
&self,
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: B,
|
||||
body: Option<B>,
|
||||
) -> Result<reqwest::Response> {
|
||||
let mut req = self.client.request(method, uri);
|
||||
if let Some(value) = &self.authorization_header {
|
||||
req = req.header(reqwest::header::AUTHORIZATION, value.get_contents())
|
||||
}
|
||||
req.json(&body).send().await.map_err(Error::ReceiveBody)
|
||||
if let Some(body) = body {
|
||||
req = req.json(&body);
|
||||
}
|
||||
req.send().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -219,7 +219,13 @@ struct Args {
|
||||
pub ssl_cert_reload_period: Duration,
|
||||
/// Trusted root CA certificates to use in https APIs.
|
||||
#[arg(long)]
|
||||
ssl_ca_file: Option<Utf8PathBuf>,
|
||||
pub ssl_ca_file: Option<Utf8PathBuf>,
|
||||
/// Flag to use https for requests to peer's safekeeper API.
|
||||
#[arg(long)]
|
||||
pub use_https_safekeeper_api: bool,
|
||||
/// Path to the JWT auth token used to authenticate with other safekeepers.
|
||||
#[arg(long)]
|
||||
auth_token_path: Option<Utf8PathBuf>,
|
||||
}
|
||||
|
||||
// Like PathBufValueParser, but allows empty string.
|
||||
@@ -338,14 +344,24 @@ async fn main() -> anyhow::Result<()> {
|
||||
};
|
||||
|
||||
// Load JWT auth token to connect to other safekeepers for pull_timeline.
|
||||
// First check if the env var is present, then check the arg with the path.
|
||||
// We want to deprecate and remove the env var method in the future.
|
||||
let sk_auth_token = match var("SAFEKEEPER_AUTH_TOKEN") {
|
||||
Ok(v) => {
|
||||
info!("loaded JWT token for authentication with safekeepers");
|
||||
Some(SecretString::from(v))
|
||||
}
|
||||
Err(VarError::NotPresent) => {
|
||||
info!("no JWT token for authentication with safekeepers detected");
|
||||
None
|
||||
if let Some(auth_token_path) = args.auth_token_path.as_ref() {
|
||||
info!(
|
||||
"loading JWT token for authentication with safekeepers from {auth_token_path}"
|
||||
);
|
||||
let auth_token = tokio::fs::read_to_string(auth_token_path).await?;
|
||||
Some(SecretString::from(auth_token.trim().to_owned()))
|
||||
} else {
|
||||
info!("no JWT token for authentication with safekeepers detected");
|
||||
None
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
warn!("JWT token for authentication with safekeepers is not unicode");
|
||||
@@ -399,6 +415,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
ssl_cert_file: args.ssl_cert_file,
|
||||
ssl_cert_reload_period: args.ssl_cert_reload_period,
|
||||
ssl_ca_certs,
|
||||
use_https_safekeeper_api: args.use_https_safekeeper_api,
|
||||
});
|
||||
|
||||
// initialize sentry if SENTRY_DSN is provided
|
||||
|
||||
@@ -16,9 +16,9 @@ use http_utils::{RequestExt, RouterBuilder};
|
||||
use hyper::{Body, Request, Response, StatusCode};
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use safekeeper_api::models::{
|
||||
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TermSwitchApiEntry,
|
||||
TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult, TimelineStatus,
|
||||
TimelineTermBumpRequest,
|
||||
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult,
|
||||
TermSwitchApiEntry, TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult,
|
||||
TimelineStatus, TimelineTermBumpRequest,
|
||||
};
|
||||
use safekeeper_api::{ServerInfo, membership, models};
|
||||
use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId};
|
||||
@@ -83,13 +83,11 @@ async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
.delete_all_for_tenant(&tenant_id, action)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
delete_info
|
||||
.iter()
|
||||
.map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
|
||||
.collect::<HashMap<String, TimelineDeleteResult>>(),
|
||||
)
|
||||
let response_body: TenantDeleteResult = delete_info
|
||||
.iter()
|
||||
.map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
|
||||
.collect::<HashMap<String, TimelineDeleteResult>>();
|
||||
json_response(StatusCode::OK, response_body)
|
||||
}
|
||||
|
||||
async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
@@ -538,6 +536,7 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
|
||||
peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
|
||||
safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
|
||||
http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),
|
||||
https_connstr: sk_info.https_connstr,
|
||||
backup_lsn: sk_info.backup_lsn.0,
|
||||
local_start_lsn: sk_info.local_start_lsn.0,
|
||||
availability_zone: None,
|
||||
|
||||
@@ -121,6 +121,7 @@ pub struct SafeKeeperConf {
|
||||
pub ssl_cert_file: Utf8PathBuf,
|
||||
pub ssl_cert_reload_period: Duration,
|
||||
pub ssl_ca_certs: Vec<Certificate>,
|
||||
pub use_https_safekeeper_api: bool,
|
||||
}
|
||||
|
||||
impl SafeKeeperConf {
|
||||
@@ -170,6 +171,7 @@ impl SafeKeeperConf {
|
||||
ssl_cert_file: Utf8PathBuf::from(defaults::DEFAULT_SSL_CERT_FILE),
|
||||
ssl_cert_reload_period: Duration::from_secs(60),
|
||||
ssl_ca_certs: Vec::new(),
|
||||
use_https_safekeeper_api: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -94,10 +94,10 @@ impl WalReceivers {
|
||||
|
||||
/// Get reference to locked slot contents. Slot must exist (registered
|
||||
/// earlier).
|
||||
fn get_slot<'a>(
|
||||
self: &'a Arc<WalReceivers>,
|
||||
fn get_slot(
|
||||
self: &Arc<WalReceivers>,
|
||||
id: WalReceiverId,
|
||||
) -> MappedMutexGuard<'a, WalReceiverState> {
|
||||
) -> MappedMutexGuard<'_, WalReceiverState> {
|
||||
MutexGuard::map(self.mutex.lock(), |locked| {
|
||||
locked.slots[id]
|
||||
.as_mut()
|
||||
|
||||
@@ -176,6 +176,7 @@ pub struct Donor {
|
||||
pub flush_lsn: Lsn,
|
||||
pub pg_connstr: String,
|
||||
pub http_connstr: String,
|
||||
pub https_connstr: Option<String>,
|
||||
}
|
||||
|
||||
impl From<&PeerInfo> for Donor {
|
||||
@@ -186,6 +187,7 @@ impl From<&PeerInfo> for Donor {
|
||||
flush_lsn: p.flush_lsn,
|
||||
pg_connstr: p.pg_connstr.clone(),
|
||||
http_connstr: p.http_connstr.clone(),
|
||||
https_connstr: p.https_connstr.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -236,11 +238,33 @@ async fn recover(
|
||||
conf: &SafeKeeperConf,
|
||||
) -> anyhow::Result<String> {
|
||||
// Learn donor term switch history to figure out starting point.
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
let mut client = reqwest::Client::builder();
|
||||
for cert in &conf.ssl_ca_certs {
|
||||
client = client.add_root_certificate(cert.clone());
|
||||
}
|
||||
let client = client
|
||||
.build()
|
||||
.context("Failed to build http client for recover")?;
|
||||
|
||||
let url = if conf.use_https_safekeeper_api {
|
||||
if let Some(https_connstr) = donor.https_connstr.as_ref() {
|
||||
format!("https://{https_connstr}")
|
||||
} else {
|
||||
anyhow::bail!(
|
||||
"cannot recover from donor {}: \
|
||||
https is enabled, but https_connstr is not specified",
|
||||
donor.sk_id
|
||||
);
|
||||
}
|
||||
} else {
|
||||
format!("http://{}", donor.http_connstr)
|
||||
};
|
||||
|
||||
let timeline_info: TimelineStatus = client
|
||||
.get(format!(
|
||||
"http://{}/v1/tenant/{}/timeline/{}",
|
||||
donor.http_connstr, tli.ttid.tenant_id, tli.ttid.timeline_id
|
||||
"{}/v1/tenant/{}/timeline/{}",
|
||||
url, tli.ttid.tenant_id, tli.ttid.timeline_id
|
||||
))
|
||||
.send()
|
||||
.await?
|
||||
|
||||
@@ -50,6 +50,7 @@ fn peer_info_from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> Peer
|
||||
local_start_lsn: Lsn(sk_info.local_start_lsn),
|
||||
pg_connstr: sk_info.safekeeper_connstr.clone(),
|
||||
http_connstr: sk_info.http_connstr.clone(),
|
||||
https_connstr: sk_info.https_connstr.clone(),
|
||||
ts,
|
||||
}
|
||||
}
|
||||
@@ -363,6 +364,7 @@ impl SharedState {
|
||||
.to_owned()
|
||||
.unwrap_or(conf.listen_pg_addr.clone()),
|
||||
http_connstr: conf.listen_http_addr.to_owned(),
|
||||
https_connstr: conf.listen_https_addr.to_owned(),
|
||||
backup_lsn: self.sk.state().inmem.backup_lsn.0,
|
||||
local_start_lsn: self.sk.state().local_start_lsn.0,
|
||||
availability_zone: conf.availability_zone.clone(),
|
||||
@@ -699,7 +701,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Take a writing mutual exclusive lock on timeline shared_state.
|
||||
pub async fn write_shared_state<'a>(self: &'a Arc<Self>) -> WriteGuardSharedState<'a> {
|
||||
pub async fn write_shared_state(self: &Arc<Self>) -> WriteGuardSharedState<'_> {
|
||||
WriteGuardSharedState::new(self.clone(), self.mutex.write().await)
|
||||
}
|
||||
|
||||
|
||||
@@ -116,7 +116,7 @@ fn test_many_tx() -> anyhow::Result<()> {
|
||||
}
|
||||
None
|
||||
})
|
||||
.last()
|
||||
.next_back()
|
||||
.unwrap();
|
||||
|
||||
let initdb_lsn = 21623024;
|
||||
|
||||
@@ -184,6 +184,7 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
|
||||
ssl_cert_file: Utf8PathBuf::from(""),
|
||||
ssl_cert_reload_period: Duration::ZERO,
|
||||
ssl_ca_certs: Vec::new(),
|
||||
use_https_safekeeper_api: false,
|
||||
};
|
||||
|
||||
let mut global = GlobalMap::new(disk, conf.clone())?;
|
||||
|
||||
@@ -141,6 +141,7 @@ async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
|
||||
peer_horizon_lsn: 5,
|
||||
safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(),
|
||||
http_connstr: "zenith-1-sk-1.local:7677".to_owned(),
|
||||
https_connstr: Some("zenith-1-sk-1.local:7678".to_owned()),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
standby_horizon: 0,
|
||||
|
||||
@@ -45,8 +45,10 @@ message SafekeeperTimelineInfo {
|
||||
uint64 standby_horizon = 14;
|
||||
// A connection string to use for WAL receiving.
|
||||
string safekeeper_connstr = 10;
|
||||
// HTTP endpoint connection string
|
||||
// HTTP endpoint connection string.
|
||||
string http_connstr = 13;
|
||||
// HTTPS endpoint connection string.
|
||||
optional string https_connstr = 15;
|
||||
// Availability zone of a safekeeper.
|
||||
optional string availability_zone = 11;
|
||||
}
|
||||
|
||||
@@ -764,6 +764,7 @@ mod tests {
|
||||
peer_horizon_lsn: 5,
|
||||
safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
|
||||
http_connstr: "neon-1-sk-1.local:7677".to_owned(),
|
||||
https_connstr: Some("neon-1-sk-1.local:7678".to_owned()),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
standby_horizon: 0,
|
||||
|
||||
@@ -10,13 +10,11 @@ pub struct Client {
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new(base_url: Url, jwt_token: Option<String>) -> Self {
|
||||
pub fn new(http_client: reqwest::Client, base_url: Url, jwt_token: Option<String>) -> Self {
|
||||
Self {
|
||||
base_url,
|
||||
jwt_token,
|
||||
client: reqwest::ClientBuilder::new()
|
||||
.build()
|
||||
.expect("Failed to construct http client"),
|
||||
client: http_client,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user