mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-12 23:20:37 +00:00
Compare commits
57 Commits
problame/v
...
arpad/comp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5e32cce2d9 | ||
|
|
6b67135bd3 | ||
|
|
66d3bef947 | ||
|
|
e749e73ad6 | ||
|
|
c5034f0e45 | ||
|
|
d2533c06a5 | ||
|
|
0fdeca882c | ||
|
|
f1bebda713 | ||
|
|
983972f812 | ||
|
|
2ea8d1b151 | ||
|
|
2f70221503 | ||
|
|
07bd0ce69e | ||
|
|
40e79712eb | ||
|
|
88b24e1593 | ||
|
|
8fcdc22283 | ||
|
|
2eb8b428cc | ||
|
|
e6a0e7ec61 | ||
|
|
843d996cb1 | ||
|
|
c824ffe1dc | ||
|
|
dadbd87ac1 | ||
|
|
0e667dcd93 | ||
|
|
14447b98ce | ||
|
|
8fcb236783 | ||
|
|
a9963db8c3 | ||
|
|
0c500450fe | ||
|
|
3182c3361a | ||
|
|
80803ff098 | ||
|
|
9b74d554b4 | ||
|
|
fce252fb2c | ||
|
|
f132658bd9 | ||
|
|
d030cbffec | ||
|
|
554a6bd4a6 | ||
|
|
9850794250 | ||
|
|
f5baac2579 | ||
|
|
2d37db234a | ||
|
|
8745c0d6f2 | ||
|
|
6c6a7f9ace | ||
|
|
e729f28205 | ||
|
|
b6e1c09c73 | ||
|
|
16d80128ee | ||
|
|
2ba414525e | ||
|
|
46210035c5 | ||
|
|
81892199f6 | ||
|
|
83eb02b07a | ||
|
|
a71f58e69c | ||
|
|
e6eb0020a1 | ||
|
|
eb0ca9b648 | ||
|
|
6843fd8f89 | ||
|
|
edc900028e | ||
|
|
789196572e | ||
|
|
425eed24e8 | ||
|
|
f67010109f | ||
|
|
0c3e3a8667 | ||
|
|
82719542c6 | ||
|
|
d25f7e3dd5 | ||
|
|
fbccd1e676 | ||
|
|
dc2ab4407f |
8
.github/workflows/benchmarking.yml
vendored
8
.github/workflows/benchmarking.yml
vendored
@@ -99,7 +99,7 @@ jobs:
|
||||
# Set --sparse-ordering option of pytest-order plugin
|
||||
# to ensure tests are running in order of appears in the file.
|
||||
# It's important for test_perf_pgbench.py::test_pgbench_remote_* tests
|
||||
extra_params: -m remote_cluster --sparse-ordering --timeout 5400 --ignore test_runner/performance/test_perf_olap.py
|
||||
extra_params: -m remote_cluster --sparse-ordering --timeout 5400 --ignore test_runner/performance/test_perf_olap.py --ignore test_runner/performance/test_perf_pgvector_queries.py
|
||||
env:
|
||||
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
@@ -410,14 +410,14 @@ jobs:
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
|
||||
- name: Benchmark pgvector hnsw queries
|
||||
- name: Benchmark pgvector queries
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
build_type: ${{ env.BUILD_TYPE }}
|
||||
test_selection: performance
|
||||
test_selection: performance/test_perf_pgvector_queries.py
|
||||
run_in_parallel: false
|
||||
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
|
||||
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_pgvector
|
||||
extra_params: -m remote_cluster --timeout 21600
|
||||
env:
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
|
||||
@@ -30,7 +30,6 @@ jobs:
|
||||
check-image:
|
||||
uses: ./.github/workflows/check-build-tools-image.yml
|
||||
|
||||
# This job uses older version of GitHub Actions because it's run on gen2 runners, which don't support node 20 (for newer versions)
|
||||
build-image:
|
||||
needs: [ check-image ]
|
||||
if: needs.check-image.outputs.found == 'false'
|
||||
@@ -55,7 +54,7 @@ jobs:
|
||||
exit 1
|
||||
fi
|
||||
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
# Use custom DOCKER_CONFIG directory to avoid conflicts with default settings
|
||||
# The default value is ~/.docker
|
||||
|
||||
66
.github/workflows/build_and_test.yml
vendored
66
.github/workflows/build_and_test.yml
vendored
@@ -299,21 +299,21 @@ jobs:
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v14
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
|
||||
|
||||
- name: Cache postgres v15 build
|
||||
id: cache_pg_15
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v15
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
|
||||
|
||||
- name: Cache postgres v16 build
|
||||
id: cache_pg_16
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v16
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
|
||||
|
||||
- name: Build postgres v14
|
||||
if: steps.cache_pg_14.outputs.cache-hit != 'true'
|
||||
@@ -337,34 +337,8 @@ jobs:
|
||||
run: |
|
||||
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests
|
||||
|
||||
- name: Run rust tests
|
||||
env:
|
||||
NEXTEST_RETRIES: 3
|
||||
run: |
|
||||
#nextest does not yet support running doctests
|
||||
cargo test --doc $CARGO_FLAGS $CARGO_FEATURES
|
||||
|
||||
for io_engine in std-fs tokio-epoll-uring ; do
|
||||
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES
|
||||
done
|
||||
|
||||
# Run separate tests for real S3
|
||||
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
|
||||
export REMOTE_STORAGE_S3_BUCKET=neon-github-ci-tests
|
||||
export REMOTE_STORAGE_S3_REGION=eu-central-1
|
||||
# Avoid `$CARGO_FEATURES` since there's no `testing` feature in the e2e tests now
|
||||
${cov_prefix} cargo nextest run $CARGO_FLAGS -E 'package(remote_storage)' -E 'test(test_real_s3)'
|
||||
|
||||
# Run separate tests for real Azure Blob Storage
|
||||
# XXX: replace region with `eu-central-1`-like region
|
||||
export ENABLE_REAL_AZURE_REMOTE_STORAGE=y
|
||||
export AZURE_STORAGE_ACCOUNT="${{ secrets.AZURE_STORAGE_ACCOUNT_DEV }}"
|
||||
export AZURE_STORAGE_ACCESS_KEY="${{ secrets.AZURE_STORAGE_ACCESS_KEY_DEV }}"
|
||||
export REMOTE_STORAGE_AZURE_CONTAINER="${{ vars.REMOTE_STORAGE_AZURE_CONTAINER }}"
|
||||
export REMOTE_STORAGE_AZURE_REGION="${{ vars.REMOTE_STORAGE_AZURE_REGION }}"
|
||||
# Avoid `$CARGO_FEATURES` since there's no `testing` feature in the e2e tests now
|
||||
${cov_prefix} cargo nextest run $CARGO_FLAGS -E 'package(remote_storage)' -E 'test(test_real_azure)'
|
||||
|
||||
# Do install *before* running rust tests because they might recompile the
|
||||
# binaries with different features/flags.
|
||||
- name: Install rust binaries
|
||||
run: |
|
||||
# Install target binaries
|
||||
@@ -405,6 +379,32 @@ jobs:
|
||||
done
|
||||
fi
|
||||
|
||||
- name: Run rust tests
|
||||
env:
|
||||
NEXTEST_RETRIES: 3
|
||||
run: |
|
||||
#nextest does not yet support running doctests
|
||||
cargo test --doc $CARGO_FLAGS $CARGO_FEATURES
|
||||
|
||||
for io_engine in std-fs tokio-epoll-uring ; do
|
||||
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES
|
||||
done
|
||||
|
||||
# Run separate tests for real S3
|
||||
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
|
||||
export REMOTE_STORAGE_S3_BUCKET=neon-github-ci-tests
|
||||
export REMOTE_STORAGE_S3_REGION=eu-central-1
|
||||
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(remote_storage)' -E 'test(test_real_s3)'
|
||||
|
||||
# Run separate tests for real Azure Blob Storage
|
||||
# XXX: replace region with `eu-central-1`-like region
|
||||
export ENABLE_REAL_AZURE_REMOTE_STORAGE=y
|
||||
export AZURE_STORAGE_ACCOUNT="${{ secrets.AZURE_STORAGE_ACCOUNT_DEV }}"
|
||||
export AZURE_STORAGE_ACCESS_KEY="${{ secrets.AZURE_STORAGE_ACCESS_KEY_DEV }}"
|
||||
export REMOTE_STORAGE_AZURE_CONTAINER="${{ vars.REMOTE_STORAGE_AZURE_CONTAINER }}"
|
||||
export REMOTE_STORAGE_AZURE_REGION="${{ vars.REMOTE_STORAGE_AZURE_REGION }}"
|
||||
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(remote_storage)' -E 'test(test_real_azure)'
|
||||
|
||||
- name: Install postgres binaries
|
||||
run: cp -a pg_install /tmp/neon/pg_install
|
||||
|
||||
@@ -858,7 +858,7 @@ jobs:
|
||||
cache-to: type=registry,ref=neondatabase/compute-node-${{ matrix.version }}:cache-${{ matrix.arch }},mode=max
|
||||
tags: |
|
||||
neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }}
|
||||
|
||||
|
||||
- name: Build neon extensions test image
|
||||
if: matrix.version == 'v16'
|
||||
uses: docker/build-push-action@v5
|
||||
@@ -965,7 +965,7 @@ jobs:
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v1
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
|
||||
23
.github/workflows/check-build-tools-image.yml
vendored
23
.github/workflows/check-build-tools-image.yml
vendored
@@ -25,26 +25,17 @@ jobs:
|
||||
found: ${{ steps.check-image.outputs.found }}
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Get build-tools image tag for the current commit
|
||||
id: get-build-tools-tag
|
||||
env:
|
||||
# Usually, for COMMIT_SHA, we use `github.event.pull_request.head.sha || github.sha`, but here, even for PRs,
|
||||
# we want to use `github.sha` i.e. point to a phantom merge commit to determine the image tag correctly.
|
||||
COMMIT_SHA: ${{ github.sha }}
|
||||
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
IMAGE_TAG: |
|
||||
${{ hashFiles('Dockerfile.build-tools',
|
||||
'.github/workflows/check-build-tools-image.yml',
|
||||
'.github/workflows/build-build-tools-image.yml') }}
|
||||
run: |
|
||||
LAST_BUILD_TOOLS_SHA=$(
|
||||
gh api \
|
||||
-H "Accept: application/vnd.github+json" \
|
||||
-H "X-GitHub-Api-Version: 2022-11-28" \
|
||||
--method GET \
|
||||
--field path=Dockerfile.build-tools \
|
||||
--field sha=${COMMIT_SHA} \
|
||||
--field per_page=1 \
|
||||
--jq ".[0].sha" \
|
||||
"/repos/${GITHUB_REPOSITORY}/commits"
|
||||
)
|
||||
echo "image-tag=${LAST_BUILD_TOOLS_SHA}" | tee -a $GITHUB_OUTPUT
|
||||
echo "image-tag=${IMAGE_TAG}" | tee -a $GITHUB_OUTPUT
|
||||
|
||||
- name: Check if such tag found in the registry
|
||||
id: check-image
|
||||
|
||||
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -2946,6 +2946,15 @@ dependencies = [
|
||||
"hashbrown 0.14.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lz4_flex"
|
||||
version = "0.11.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5"
|
||||
dependencies = [
|
||||
"twox-hash",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "match_cfg"
|
||||
version = "0.1.0"
|
||||
@@ -3612,6 +3621,7 @@ dependencies = [
|
||||
"hyper 0.14.26",
|
||||
"itertools",
|
||||
"leaky-bucket",
|
||||
"lz4_flex",
|
||||
"md5",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
|
||||
@@ -110,6 +110,7 @@ jsonwebtoken = "9"
|
||||
lasso = "0.7"
|
||||
leaky-bucket = "1.0.1"
|
||||
libc = "0.2"
|
||||
lz4_flex = "0.11"
|
||||
md5 = "0.7.0"
|
||||
measured = { version = "0.0.21", features=["lasso"] }
|
||||
measured-process = { version = "0.0.21" }
|
||||
@@ -120,7 +121,7 @@ num_cpus = "1.15"
|
||||
num-traits = "0.2.15"
|
||||
once_cell = "1.13"
|
||||
opentelemetry = "0.20.0"
|
||||
opentelemetry-otlp = { version = "0.13.0", default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-otlp = { version = "0.13.0", default-features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-semantic-conventions = "0.12.0"
|
||||
parking_lot = "0.12"
|
||||
parquet = { version = "51.0.0", default-features = false, features = ["zstd"] }
|
||||
@@ -128,7 +129,7 @@ parquet_derive = "51.0.0"
|
||||
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
|
||||
pin-project-lite = "0.2"
|
||||
procfs = "0.14"
|
||||
prometheus = {version = "0.13", default_features=false, features = ["process"]} # removes protobuf dependency
|
||||
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
|
||||
prost = "0.11"
|
||||
rand = "0.8"
|
||||
redis = { version = "0.25.2", features = ["tokio-rustls-comp", "keep-alive"] }
|
||||
@@ -184,7 +185,7 @@ tower-service = "0.3.2"
|
||||
tracing = "0.1"
|
||||
tracing-error = "0.2.0"
|
||||
tracing-opentelemetry = "0.21.0"
|
||||
tracing-subscriber = { version = "0.3", default_features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json", "ansi"] }
|
||||
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json", "ansi"] }
|
||||
twox-hash = { version = "1.6.3", default-features = false }
|
||||
url = "2.2"
|
||||
urlencoding = "2.1"
|
||||
|
||||
@@ -69,8 +69,6 @@ RUN set -e \
|
||||
&& apt install -y \
|
||||
libreadline-dev \
|
||||
libseccomp-dev \
|
||||
libicu67 \
|
||||
openssl \
|
||||
ca-certificates \
|
||||
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \
|
||||
&& useradd -d /data neon \
|
||||
|
||||
@@ -112,6 +112,45 @@ RUN for package in Capture::Tiny DateTime Devel::Cover Digest::MD5 File::Spec JS
|
||||
&& make install \
|
||||
&& rm -rf ../lcov.tar.gz
|
||||
|
||||
# Compile and install the static OpenSSL library
|
||||
ENV OPENSSL_VERSION=3.2.2
|
||||
ENV OPENSSL_PREFIX=/usr/local/openssl
|
||||
RUN wget -O /tmp/openssl-${OPENSSL_VERSION}.tar.gz https://www.openssl.org/source/openssl-${OPENSSL_VERSION}.tar.gz && \
|
||||
echo "197149c18d9e9f292c43f0400acaba12e5f52cacfe050f3d199277ea738ec2e7 /tmp/openssl-${OPENSSL_VERSION}.tar.gz" | sha256sum --check && \
|
||||
cd /tmp && \
|
||||
tar xzvf /tmp/openssl-${OPENSSL_VERSION}.tar.gz && \
|
||||
rm /tmp/openssl-${OPENSSL_VERSION}.tar.gz && \
|
||||
cd /tmp/openssl-${OPENSSL_VERSION} && \
|
||||
./config --prefix=${OPENSSL_PREFIX} -static --static no-shared -fPIC && \
|
||||
make -j "$(nproc)" && \
|
||||
make install && \
|
||||
cd /tmp && \
|
||||
rm -rf /tmp/openssl-${OPENSSL_VERSION}
|
||||
|
||||
# Use the same version of libicu as the compute nodes so that
|
||||
# clusters created using inidb on pageserver can be used by computes.
|
||||
#
|
||||
# TODO: at this time, Dockerfile.compute-node uses the debian bullseye libicu
|
||||
# package, which is 67.1. We're duplicating that knowledge here, and also, technically,
|
||||
# Debian has a few patches on top of 67.1 that we're not adding here.
|
||||
ENV ICU_VERSION=67.1
|
||||
ENV ICU_PREFIX=/usr/local/icu
|
||||
|
||||
# Download and build static ICU
|
||||
RUN wget -O /tmp/libicu-${ICU_VERSION}.tgz https://github.com/unicode-org/icu/releases/download/release-${ICU_VERSION//./-}/icu4c-${ICU_VERSION//./_}-src.tgz && \
|
||||
echo "94a80cd6f251a53bd2a997f6f1b5ac6653fe791dfab66e1eb0227740fb86d5dc /tmp/libicu-${ICU_VERSION}.tgz" | sha256sum --check && \
|
||||
mkdir /tmp/icu && \
|
||||
pushd /tmp/icu && \
|
||||
tar -xzf /tmp/libicu-${ICU_VERSION}.tgz && \
|
||||
pushd icu/source && \
|
||||
./configure --prefix=${ICU_PREFIX} --enable-static --enable-shared=no CXXFLAGS="-fPIC" CFLAGS="-fPIC" && \
|
||||
make -j "$(nproc)" && \
|
||||
make install && \
|
||||
popd && \
|
||||
rm -rf icu && \
|
||||
rm -f /tmp/libicu-${ICU_VERSION}.tgz && \
|
||||
popd
|
||||
|
||||
# Switch to nonroot user
|
||||
USER nonroot:nonroot
|
||||
WORKDIR /home/nonroot
|
||||
@@ -141,7 +180,7 @@ WORKDIR /home/nonroot
|
||||
|
||||
# Rust
|
||||
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
|
||||
ENV RUSTC_VERSION=1.78.0
|
||||
ENV RUSTC_VERSION=1.79.0
|
||||
ENV RUSTUP_HOME="/home/nonroot/.rustup"
|
||||
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
|
||||
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \
|
||||
@@ -170,3 +209,6 @@ RUN whoami \
|
||||
&& rustup --version --verbose \
|
||||
&& rustc --version --verbose \
|
||||
&& clang --version
|
||||
|
||||
# Set following flag to check in Makefile if its running in Docker
|
||||
RUN touch /home/nonroot/.docker_build
|
||||
|
||||
@@ -246,8 +246,8 @@ COPY patches/pgvector.patch /pgvector.patch
|
||||
# By default, pgvector Makefile uses `-march=native`. We don't want that,
|
||||
# because we build the images on different machines than where we run them.
|
||||
# Pass OPTFLAGS="" to remove it.
|
||||
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.7.1.tar.gz -O pgvector.tar.gz && \
|
||||
echo "fe6c8cb4e0cd1a8cb60f5badf9e1701e0fcabcfc260931c26d01e155c4dd21d1 pgvector.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.7.2.tar.gz -O pgvector.tar.gz && \
|
||||
echo "617fba855c9bcb41a2a9bc78a78567fd2e147c72afd5bf9d37b31b9591632b30 pgvector.tar.gz" | sha256sum --check && \
|
||||
mkdir pgvector-src && cd pgvector-src && tar xzf ../pgvector.tar.gz --strip-components=1 -C . && \
|
||||
patch -p1 < /pgvector.patch && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) OPTFLAGS="" PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
@@ -979,7 +979,7 @@ RUN cd /ext-src/ && for f in *.tar.gz; \
|
||||
do echo $f; dname=$(echo $f | sed 's/\.tar.*//')-src; \
|
||||
rm -rf $dname; mkdir $dname; tar xzf $f --strip-components=1 -C $dname \
|
||||
|| exit 1; rm -f $f; done
|
||||
RUN cd /ext-src/pgvector-src && patch -p1 <../pgvector.patch
|
||||
RUN cd /ext-src/pgvector-src && patch -p1 <../pgvector.patch
|
||||
# cmake is required for the h3 test
|
||||
RUN apt-get update && apt-get install -y cmake
|
||||
RUN patch -p1 < /ext-src/pg_hintplan.patch
|
||||
|
||||
15
Makefile
15
Makefile
@@ -3,6 +3,9 @@ ROOT_PROJECT_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
|
||||
# Where to install Postgres, default is ./pg_install, maybe useful for package managers
|
||||
POSTGRES_INSTALL_DIR ?= $(ROOT_PROJECT_DIR)/pg_install/
|
||||
|
||||
OPENSSL_PREFIX_DIR := /usr/local/openssl
|
||||
ICU_PREFIX_DIR := /usr/local/icu
|
||||
|
||||
#
|
||||
# We differentiate between release / debug build types using the BUILD_TYPE
|
||||
# environment variable.
|
||||
@@ -20,6 +23,16 @@ else
|
||||
$(error Bad build type '$(BUILD_TYPE)', see Makefile for options)
|
||||
endif
|
||||
|
||||
ifeq ($(shell test -e /home/nonroot/.docker_build && echo -n yes),yes)
|
||||
# Exclude static build openssl, icu for local build (MacOS, Linux)
|
||||
# Only keep for build type release and debug
|
||||
PG_CFLAGS += -I$(OPENSSL_PREFIX_DIR)/include
|
||||
PG_CONFIGURE_OPTS += --with-icu
|
||||
PG_CONFIGURE_OPTS += ICU_CFLAGS='-I/$(ICU_PREFIX_DIR)/include -DU_STATIC_IMPLEMENTATION'
|
||||
PG_CONFIGURE_OPTS += ICU_LIBS='-L$(ICU_PREFIX_DIR)/lib -L$(ICU_PREFIX_DIR)/lib64 -licui18n -licuuc -licudata -lstdc++ -Wl,-Bdynamic -lm'
|
||||
PG_CONFIGURE_OPTS += LDFLAGS='-L$(OPENSSL_PREFIX_DIR)/lib -L$(OPENSSL_PREFIX_DIR)/lib64 -L$(ICU_PREFIX_DIR)/lib -L$(ICU_PREFIX_DIR)/lib64 -Wl,-Bstatic -lssl -lcrypto -Wl,-Bdynamic -lrt -lm -ldl -lpthread'
|
||||
endif
|
||||
|
||||
UNAME_S := $(shell uname -s)
|
||||
ifeq ($(UNAME_S),Linux)
|
||||
# Seccomp BPF is only available for Linux
|
||||
@@ -28,7 +41,7 @@ else ifeq ($(UNAME_S),Darwin)
|
||||
ifndef DISABLE_HOMEBREW
|
||||
# macOS with brew-installed openssl requires explicit paths
|
||||
# It can be configured with OPENSSL_PREFIX variable
|
||||
OPENSSL_PREFIX ?= $(shell brew --prefix openssl@3)
|
||||
OPENSSL_PREFIX := $(shell brew --prefix openssl@3)
|
||||
PG_CONFIGURE_OPTS += --with-includes=$(OPENSSL_PREFIX)/include --with-libraries=$(OPENSSL_PREFIX)/lib
|
||||
PG_CONFIGURE_OPTS += PKG_CONFIG_PATH=$(shell brew --prefix icu4c)/lib/pkgconfig
|
||||
# macOS already has bison and flex in the system, but they are old and result in postgres-v14 target failure
|
||||
|
||||
@@ -735,7 +735,7 @@ fn cli() -> clap::Command {
|
||||
Arg::new("filecache-connstr")
|
||||
.long("filecache-connstr")
|
||||
.default_value(
|
||||
"host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable",
|
||||
"host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable application_name=vm-monitor",
|
||||
)
|
||||
.value_name("FILECACHE_CONNSTR"),
|
||||
)
|
||||
|
||||
@@ -918,38 +918,39 @@ impl ComputeNode {
|
||||
// temporarily reset max_cluster_size in config
|
||||
// to avoid the possibility of hitting the limit, while we are reconfiguring:
|
||||
// creating new extensions, roles, etc...
|
||||
config::compute_ctl_temp_override_create(pgdata_path, "neon.max_cluster_size=-1")?;
|
||||
self.pg_reload_conf()?;
|
||||
config::with_compute_ctl_tmp_override(pgdata_path, "neon.max_cluster_size=-1", || {
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
|
||||
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
|
||||
|
||||
// Proceed with post-startup configuration. Note, that order of operations is important.
|
||||
// Disable DDL forwarding because control plane already knows about these roles/databases.
|
||||
if spec.mode == ComputeMode::Primary {
|
||||
client.simple_query("SET neon.forward_ddl = false")?;
|
||||
cleanup_instance(&mut client)?;
|
||||
handle_roles(&spec, &mut client)?;
|
||||
handle_databases(&spec, &mut client)?;
|
||||
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(
|
||||
&spec,
|
||||
&mut client,
|
||||
self.connstr.as_str(),
|
||||
self.has_feature(ComputeFeature::AnonExtension),
|
||||
)?;
|
||||
handle_extensions(&spec, &mut client)?;
|
||||
handle_extension_neon(&mut client)?;
|
||||
// We can skip handle_migrations here because a new migration can only appear
|
||||
// if we have a new version of the compute_ctl binary, which can only happen
|
||||
// if compute got restarted, in which case we'll end up inside of apply_config
|
||||
// instead of reconfigure.
|
||||
}
|
||||
// Proceed with post-startup configuration. Note, that order of operations is important.
|
||||
// Disable DDL forwarding because control plane already knows about these roles/databases.
|
||||
if spec.mode == ComputeMode::Primary {
|
||||
client.simple_query("SET neon.forward_ddl = false")?;
|
||||
cleanup_instance(&mut client)?;
|
||||
handle_roles(&spec, &mut client)?;
|
||||
handle_databases(&spec, &mut client)?;
|
||||
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(
|
||||
&spec,
|
||||
&mut client,
|
||||
self.connstr.as_str(),
|
||||
self.has_feature(ComputeFeature::AnonExtension),
|
||||
)?;
|
||||
handle_extensions(&spec, &mut client)?;
|
||||
handle_extension_neon(&mut client)?;
|
||||
// We can skip handle_migrations here because a new migration can only appear
|
||||
// if we have a new version of the compute_ctl binary, which can only happen
|
||||
// if compute got restarted, in which case we'll end up inside of apply_config
|
||||
// instead of reconfigure.
|
||||
}
|
||||
|
||||
// 'Close' connection
|
||||
drop(client);
|
||||
// 'Close' connection
|
||||
drop(client);
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
// reset max_cluster_size in config back to original value and reload config
|
||||
config::compute_ctl_temp_override_remove(pgdata_path)?;
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
let unknown_op = "unknown".to_string();
|
||||
@@ -1040,12 +1041,17 @@ impl ComputeNode {
|
||||
// temporarily reset max_cluster_size in config
|
||||
// to avoid the possibility of hitting the limit, while we are applying config:
|
||||
// creating new extensions, roles, etc...
|
||||
config::compute_ctl_temp_override_create(pgdata_path, "neon.max_cluster_size=-1")?;
|
||||
self.pg_reload_conf()?;
|
||||
config::with_compute_ctl_tmp_override(
|
||||
pgdata_path,
|
||||
"neon.max_cluster_size=-1",
|
||||
|| {
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
self.apply_config(&compute_state)?;
|
||||
self.apply_config(&compute_state)?;
|
||||
|
||||
config::compute_ctl_temp_override_remove(pgdata_path)?;
|
||||
Ok(())
|
||||
},
|
||||
)?;
|
||||
self.pg_reload_conf()?;
|
||||
}
|
||||
self.post_apply_config()?;
|
||||
|
||||
@@ -131,18 +131,17 @@ pub fn write_postgres_conf(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// create file compute_ctl_temp_override.conf in pgdata_dir
|
||||
/// add provided options to this file
|
||||
pub fn compute_ctl_temp_override_create(pgdata_path: &Path, options: &str) -> Result<()> {
|
||||
pub fn with_compute_ctl_tmp_override<F>(pgdata_path: &Path, options: &str, exec: F) -> Result<()>
|
||||
where
|
||||
F: FnOnce() -> Result<()>,
|
||||
{
|
||||
let path = pgdata_path.join("compute_ctl_temp_override.conf");
|
||||
let mut file = File::create(path)?;
|
||||
write!(file, "{}", options)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// remove file compute_ctl_temp_override.conf in pgdata_dir
|
||||
pub fn compute_ctl_temp_override_remove(pgdata_path: &Path) -> Result<()> {
|
||||
let path = pgdata_path.join("compute_ctl_temp_override.conf");
|
||||
std::fs::remove_file(path)?;
|
||||
Ok(())
|
||||
let res = exec();
|
||||
|
||||
file.set_len(0)?;
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ use hyper::header::CONTENT_TYPE;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
||||
use tokio::task;
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use tracing_utils::http::OtelName;
|
||||
use utils::http::request::must_get_query_param;
|
||||
|
||||
@@ -48,7 +48,7 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
match (req.method(), req.uri().path()) {
|
||||
// Serialized compute state.
|
||||
(&Method::GET, "/status") => {
|
||||
info!("serving /status GET request");
|
||||
debug!("serving /status GET request");
|
||||
let state = compute.state.lock().unwrap();
|
||||
let status_response = status_response_from_state(&state);
|
||||
Response::new(Body::from(serde_json::to_string(&status_response).unwrap()))
|
||||
|
||||
@@ -4,10 +4,6 @@ version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
testing = [ "pageserver_api/testing" ]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
async-trait.workspace = true
|
||||
|
||||
@@ -383,12 +383,6 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<AuxFilePolicy>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'switch_aux_file_policy'")?,
|
||||
#[cfg(feature = "testing")]
|
||||
test_vm_bit_debug_logging: settings
|
||||
.remove("test_vm_bit_debug_logging")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'test_vm_bit_debug_logging' as bool")?,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
@@ -512,12 +506,6 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<AuxFilePolicy>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'switch_aux_file_policy'")?,
|
||||
#[cfg(feature = "testing")]
|
||||
test_vm_bit_debug_logging: settings
|
||||
.remove("test_vm_bit_debug_logging")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'test_vm_bit_debug_logging' as bool")?,
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -4,10 +4,6 @@ version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
testing = []
|
||||
|
||||
[dependencies]
|
||||
serde.workspace = true
|
||||
serde_with.workspace = true
|
||||
|
||||
@@ -558,6 +558,12 @@ impl KeySpaceRandomAccum {
|
||||
self.ranges.push(range);
|
||||
}
|
||||
|
||||
pub fn add_keyspace(&mut self, keyspace: KeySpace) {
|
||||
for range in keyspace.ranges {
|
||||
self.add_range(range);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_keyspace(mut self) -> KeySpace {
|
||||
let mut ranges = Vec::new();
|
||||
if !self.ranges.is_empty() {
|
||||
|
||||
@@ -322,8 +322,6 @@ pub struct TenantConfig {
|
||||
pub timeline_get_throttle: Option<ThrottleConfig>,
|
||||
pub image_layer_creation_check_threshold: Option<u8>,
|
||||
pub switch_aux_file_policy: Option<AuxFilePolicy>,
|
||||
#[cfg(feature = "testing")]
|
||||
pub test_vm_bit_debug_logging: Option<bool>,
|
||||
}
|
||||
|
||||
/// The policy for the aux file storage. It can be switched through `switch_aux_file_policy`
|
||||
@@ -457,6 +455,26 @@ pub enum CompactionAlgorithm {
|
||||
Tiered,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Debug,
|
||||
Clone,
|
||||
Copy,
|
||||
PartialEq,
|
||||
Eq,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
strum_macros::FromRepr,
|
||||
strum_macros::EnumString,
|
||||
enum_map::Enum,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
pub enum ImageCompressionAlgorithm {
|
||||
ZstdLow,
|
||||
Zstd,
|
||||
ZstdHigh,
|
||||
LZ4,
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct CompactionAlgorithmSettings {
|
||||
pub kind: CompactionAlgorithm,
|
||||
|
||||
@@ -7,7 +7,7 @@ license.workspace = true
|
||||
[dependencies]
|
||||
hyper.workspace = true
|
||||
opentelemetry = { workspace = true, features=["rt-tokio"] }
|
||||
opentelemetry-otlp = { workspace = true, default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-otlp = { workspace = true, default-features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-semantic-conventions.workspace = true
|
||||
reqwest = { workspace = true, default-features = false, features = ["rustls-tls"] }
|
||||
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||
|
||||
@@ -25,6 +25,8 @@ pub struct Config {
|
||||
///
|
||||
/// For simplicity, this value must be greater than or equal to `memory_history_len`.
|
||||
memory_history_log_interval: usize,
|
||||
/// The max number of iterations to skip before logging the next iteration
|
||||
memory_history_log_noskip_interval: Duration,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
@@ -33,6 +35,7 @@ impl Default for Config {
|
||||
memory_poll_interval: Duration::from_millis(100),
|
||||
memory_history_len: 5, // use 500ms of history for decision-making
|
||||
memory_history_log_interval: 20, // but only log every ~2s (otherwise it's spammy)
|
||||
memory_history_log_noskip_interval: Duration::from_secs(15), // but only if it's changed, or 60 seconds have passed
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -85,7 +88,12 @@ impl CgroupWatcher {
|
||||
|
||||
// buffer for samples that will be logged. once full, it remains so.
|
||||
let history_log_len = self.config.memory_history_log_interval;
|
||||
let max_skip = self.config.memory_history_log_noskip_interval;
|
||||
let mut history_log_buf = vec![MemoryStatus::zeroed(); history_log_len];
|
||||
let mut last_logged_memusage = MemoryStatus::zeroed();
|
||||
|
||||
// Ensure that we're tracking a value that's definitely in the past, as Instant::now is only guaranteed to be non-decreasing on Rust's T1-supported systems.
|
||||
let mut can_skip_logs_until = Instant::now() - max_skip;
|
||||
|
||||
for t in 0_u64.. {
|
||||
ticker.tick().await;
|
||||
@@ -115,12 +123,24 @@ impl CgroupWatcher {
|
||||
// equal to the logging interval, we can just log the entire buffer every time we set
|
||||
// the last entry, which also means that for this log line, we can ignore that it's a
|
||||
// ring buffer (because all the entries are in order of increasing time).
|
||||
if i == history_log_len - 1 {
|
||||
//
|
||||
// We skip logging the data if data hasn't meaningfully changed in a while, unless
|
||||
// we've already ignored previous iterations for the last max_skip period.
|
||||
if i == history_log_len - 1
|
||||
&& (now > can_skip_logs_until
|
||||
|| !history_log_buf
|
||||
.iter()
|
||||
.all(|usage| last_logged_memusage.status_is_close_or_similar(usage)))
|
||||
{
|
||||
info!(
|
||||
history = ?MemoryStatus::debug_slice(&history_log_buf),
|
||||
summary = ?summary,
|
||||
"Recent cgroup memory statistics history"
|
||||
);
|
||||
|
||||
can_skip_logs_until = now + max_skip;
|
||||
|
||||
last_logged_memusage = *history_log_buf.last().unwrap();
|
||||
}
|
||||
|
||||
updates
|
||||
@@ -232,6 +252,24 @@ impl MemoryStatus {
|
||||
|
||||
DS(slice)
|
||||
}
|
||||
|
||||
/// Check if the other memory status is a close or similar result.
|
||||
/// Returns true if the larger value is not larger than the smaller value
|
||||
/// by 1/8 of the smaller value, and within 128MiB.
|
||||
/// See tests::check_similarity_behaviour for examples of behaviour
|
||||
fn status_is_close_or_similar(&self, other: &MemoryStatus) -> bool {
|
||||
let margin;
|
||||
let diff;
|
||||
if self.non_reclaimable >= other.non_reclaimable {
|
||||
margin = other.non_reclaimable / 8;
|
||||
diff = self.non_reclaimable - other.non_reclaimable;
|
||||
} else {
|
||||
margin = self.non_reclaimable / 8;
|
||||
diff = other.non_reclaimable - self.non_reclaimable;
|
||||
}
|
||||
|
||||
diff < margin && diff < 128 * 1024 * 1024
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -261,4 +299,65 @@ mod tests {
|
||||
assert_eq!(values(2, 4), [9, 0, 1, 2]);
|
||||
assert_eq!(values(2, 10), [3, 4, 5, 6, 7, 8, 9, 0, 1, 2]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn check_similarity_behaviour() {
|
||||
// This all accesses private methods, so we can't actually run this
|
||||
// as doctests, because doctests run as an external crate.
|
||||
let mut small = super::MemoryStatus {
|
||||
non_reclaimable: 1024,
|
||||
};
|
||||
let mut large = super::MemoryStatus {
|
||||
non_reclaimable: 1024 * 1024 * 1024 * 1024,
|
||||
};
|
||||
|
||||
// objects are self-similar, no matter the size
|
||||
assert!(small.status_is_close_or_similar(&small));
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
// inequality is symmetric
|
||||
assert!(!small.status_is_close_or_similar(&large));
|
||||
assert!(!large.status_is_close_or_similar(&small));
|
||||
|
||||
small.non_reclaimable = 64;
|
||||
large.non_reclaimable = (small.non_reclaimable / 8) * 9;
|
||||
|
||||
// objects are self-similar, no matter the size
|
||||
assert!(small.status_is_close_or_similar(&small));
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
// values are similar if the larger value is larger by less than
|
||||
// 12.5%, i.e. 1/8 of the smaller value.
|
||||
// In the example above, large is exactly 12.5% larger, so this doesn't
|
||||
// match.
|
||||
assert!(!small.status_is_close_or_similar(&large));
|
||||
assert!(!large.status_is_close_or_similar(&small));
|
||||
|
||||
large.non_reclaimable -= 1;
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
assert!(small.status_is_close_or_similar(&large));
|
||||
assert!(large.status_is_close_or_similar(&small));
|
||||
|
||||
// The 1/8 rule only applies up to 128MiB of difference
|
||||
small.non_reclaimable = 1024 * 1024 * 1024 * 1024;
|
||||
large.non_reclaimable = small.non_reclaimable / 8 * 9;
|
||||
assert!(small.status_is_close_or_similar(&small));
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
assert!(!small.status_is_close_or_similar(&large));
|
||||
assert!(!large.status_is_close_or_similar(&small));
|
||||
// the large value is put just above the threshold
|
||||
large.non_reclaimable = small.non_reclaimable + 128 * 1024 * 1024;
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
assert!(!small.status_is_close_or_similar(&large));
|
||||
assert!(!large.status_is_close_or_similar(&small));
|
||||
// now below
|
||||
large.non_reclaimable -= 1;
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
assert!(small.status_is_close_or_similar(&large));
|
||||
assert!(large.status_is_close_or_similar(&small));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,11 +12,11 @@ use futures::{
|
||||
stream::{SplitSink, SplitStream},
|
||||
SinkExt, StreamExt,
|
||||
};
|
||||
use tracing::info;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use crate::protocol::{
|
||||
OutboundMsg, ProtocolRange, ProtocolResponse, ProtocolVersion, PROTOCOL_MAX_VERSION,
|
||||
PROTOCOL_MIN_VERSION,
|
||||
OutboundMsg, OutboundMsgKind, ProtocolRange, ProtocolResponse, ProtocolVersion,
|
||||
PROTOCOL_MAX_VERSION, PROTOCOL_MIN_VERSION,
|
||||
};
|
||||
|
||||
/// The central handler for all communications in the monitor.
|
||||
@@ -118,7 +118,12 @@ impl Dispatcher {
|
||||
/// serialize the wrong thing and send it, since `self.sink.send` will take
|
||||
/// any string.
|
||||
pub async fn send(&mut self, message: OutboundMsg) -> anyhow::Result<()> {
|
||||
info!(?message, "sending message");
|
||||
if matches!(&message.inner, OutboundMsgKind::HealthCheck { .. }) {
|
||||
debug!(?message, "sending message");
|
||||
} else {
|
||||
info!(?message, "sending message");
|
||||
}
|
||||
|
||||
let json = serde_json::to_string(&message).context("failed to serialize message")?;
|
||||
self.sink
|
||||
.send(Message::Text(json))
|
||||
|
||||
@@ -12,7 +12,7 @@ use axum::extract::ws::{Message, WebSocket};
|
||||
use futures::StreamExt;
|
||||
use tokio::sync::{broadcast, watch};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::cgroup::{self, CgroupWatcher};
|
||||
use crate::dispatcher::Dispatcher;
|
||||
@@ -474,26 +474,29 @@ impl Runner {
|
||||
// there is a message from the agent
|
||||
msg = self.dispatcher.source.next() => {
|
||||
if let Some(msg) = msg {
|
||||
// Don't use 'message' as a key as the string also uses
|
||||
// that for its key
|
||||
info!(?msg, "received message");
|
||||
match msg {
|
||||
match &msg {
|
||||
Ok(msg) => {
|
||||
let message: InboundMsg = match msg {
|
||||
Message::Text(text) => {
|
||||
serde_json::from_str(&text).context("failed to deserialize text message")?
|
||||
serde_json::from_str(text).context("failed to deserialize text message")?
|
||||
}
|
||||
other => {
|
||||
warn!(
|
||||
// Don't use 'message' as a key as the
|
||||
// string also uses that for its key
|
||||
msg = ?other,
|
||||
"agent should only send text messages but received different type"
|
||||
"problem processing incoming message: agent should only send text messages but received different type"
|
||||
);
|
||||
continue
|
||||
},
|
||||
};
|
||||
|
||||
if matches!(&message.inner, InboundMsgKind::HealthCheck { .. }) {
|
||||
debug!(?msg, "received message");
|
||||
} else {
|
||||
info!(?msg, "received message");
|
||||
}
|
||||
|
||||
let out = match self.process_message(message.clone()).await {
|
||||
Ok(Some(out)) => out,
|
||||
Ok(None) => continue,
|
||||
@@ -517,7 +520,11 @@ impl Runner {
|
||||
.await
|
||||
.context("failed to send message")?;
|
||||
}
|
||||
Err(e) => warn!("{e}"),
|
||||
Err(e) => warn!(
|
||||
error = format!("{e}"),
|
||||
msg = ?msg,
|
||||
"received error message"
|
||||
),
|
||||
}
|
||||
} else {
|
||||
anyhow::bail!("dispatcher connection closed")
|
||||
|
||||
@@ -8,7 +8,7 @@ license.workspace = true
|
||||
default = []
|
||||
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
|
||||
# which adds some runtime cost to run tests on outage conditions
|
||||
testing = ["fail/failpoints", "pageserver_api/testing"]
|
||||
testing = ["fail/failpoints"]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
@@ -37,6 +37,7 @@ humantime-serde.workspace = true
|
||||
hyper.workspace = true
|
||||
itertools.workspace = true
|
||||
leaky-bucket.workspace = true
|
||||
lz4_flex.workspace = true
|
||||
md5.workspace = true
|
||||
nix.workspace = true
|
||||
# hack to get the number of worker threads tokio uses
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
use std::num::NonZeroU32;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
@@ -8,7 +11,7 @@ use pageserver::task_mgr::TaskKind;
|
||||
use pageserver::tenant::block_io::BlockCursor;
|
||||
use pageserver::tenant::disk_btree::DiskBtreeReader;
|
||||
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
|
||||
use pageserver::tenant::storage_layer::{delta_layer, image_layer};
|
||||
use pageserver::tenant::storage_layer::{delta_layer, image_layer, LayerName};
|
||||
use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer};
|
||||
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
|
||||
use pageserver::{page_cache, virtual_file};
|
||||
@@ -20,7 +23,12 @@ use pageserver::{
|
||||
},
|
||||
virtual_file::VirtualFile,
|
||||
};
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use remote_storage::{GenericRemoteStorage, ListingMode, RemotePath, RemoteStorageConfig};
|
||||
use std::fs;
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
@@ -55,6 +63,17 @@ pub(crate) enum LayerCmd {
|
||||
#[clap(long)]
|
||||
new_timeline_id: Option<TimelineId>,
|
||||
},
|
||||
CompressOne {
|
||||
dest_path: Utf8PathBuf,
|
||||
layer_file_path: Utf8PathBuf,
|
||||
},
|
||||
CompressMany {
|
||||
tmp_dir: Utf8PathBuf,
|
||||
tenant_remote_prefix: String,
|
||||
tenant_remote_config: String,
|
||||
layers_dir: Utf8PathBuf,
|
||||
parallelism: Option<u32>,
|
||||
},
|
||||
}
|
||||
|
||||
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
|
||||
@@ -240,5 +259,138 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
|
||||
anyhow::bail!("not an image or delta layer: {layer_file_path}");
|
||||
}
|
||||
LayerCmd::CompressOne {
|
||||
dest_path,
|
||||
layer_file_path,
|
||||
} => {
|
||||
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
|
||||
let stats =
|
||||
ImageLayer::compression_statistics(dest_path, layer_file_path, &ctx).await?;
|
||||
println!(
|
||||
"Statistics: {stats:#?}\n{}",
|
||||
serde_json::to_string(&stats).unwrap()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
LayerCmd::CompressMany {
|
||||
tmp_dir,
|
||||
tenant_remote_prefix,
|
||||
tenant_remote_config,
|
||||
layers_dir,
|
||||
parallelism,
|
||||
} => {
|
||||
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let toml_document = toml_edit::Document::from_str(tenant_remote_config)?;
|
||||
let toml_item = toml_document
|
||||
.get("remote_storage")
|
||||
.expect("need remote_storage");
|
||||
let config = RemoteStorageConfig::from_toml(toml_item)?.expect("incomplete config");
|
||||
let storage = remote_storage::GenericRemoteStorage::from_config(&config)?;
|
||||
let storage = Arc::new(storage);
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
let path = RemotePath::from_string(tenant_remote_prefix)?;
|
||||
let max_files = NonZeroU32::new(128_000);
|
||||
let files_list = storage
|
||||
.list(Some(&path), ListingMode::NoDelimiter, max_files, &cancel)
|
||||
.await?;
|
||||
|
||||
println!("Listing gave {} keys", files_list.keys.len());
|
||||
|
||||
tokio::fs::create_dir_all(&layers_dir).await?;
|
||||
|
||||
let semaphore = Arc::new(Semaphore::new(parallelism.unwrap_or(1) as usize));
|
||||
|
||||
let mut tasks = JoinSet::new();
|
||||
for (file_idx, file_key) in files_list.keys.iter().enumerate() {
|
||||
let Some(file_name) = file_key.object_name() else {
|
||||
continue;
|
||||
};
|
||||
match LayerName::from_str(file_name) {
|
||||
Ok(LayerName::Delta(_)) => continue,
|
||||
Ok(LayerName::Image(_)) => (),
|
||||
Err(_e) => {
|
||||
// Split off the final part. We ensured above that this is not turning a
|
||||
// generation-less delta layer file name into an image layer file name.
|
||||
let Some(file_without_generation) = file_name.rsplit_once('-') else {
|
||||
continue;
|
||||
};
|
||||
let Ok(LayerName::Image(_layer_file_name)) =
|
||||
LayerName::from_str(file_without_generation.0)
|
||||
else {
|
||||
// Skipping because it's either not a layer or an image layer
|
||||
//println!("object {file_name}: not an image layer");
|
||||
continue;
|
||||
};
|
||||
}
|
||||
}
|
||||
let json_file_path = layers_dir.join(format!("{file_name}.json"));
|
||||
if tokio::fs::try_exists(&json_file_path).await? {
|
||||
//println!("object {file_name}: report already created");
|
||||
// If we have already created a report for the layer, skip it.
|
||||
continue;
|
||||
}
|
||||
let local_layer_path = layers_dir.join(file_name);
|
||||
async fn stats(
|
||||
semaphore: Arc<Semaphore>,
|
||||
local_layer_path: Utf8PathBuf,
|
||||
json_file_path: Utf8PathBuf,
|
||||
tmp_dir: Utf8PathBuf,
|
||||
storage: Arc<GenericRemoteStorage>,
|
||||
file_key: RemotePath,
|
||||
) -> Result<Vec<(Option<ImageCompressionAlgorithm>, u64, u64, u64)>, anyhow::Error>
|
||||
{
|
||||
let _permit = semaphore.acquire().await?;
|
||||
let cancel = CancellationToken::new();
|
||||
let download = storage.download(&file_key, &cancel).await?;
|
||||
let mut dest_layer_file = tokio::fs::File::create(&local_layer_path).await?;
|
||||
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
|
||||
let _size = tokio::io::copy_buf(&mut body, &mut dest_layer_file).await?;
|
||||
println!("Downloaded file to {local_layer_path}");
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
let stats =
|
||||
ImageLayer::compression_statistics(&tmp_dir, &local_layer_path, &ctx)
|
||||
.await?;
|
||||
|
||||
let stats_str = serde_json::to_string(&stats).unwrap();
|
||||
tokio::fs::write(json_file_path, stats_str).await?;
|
||||
|
||||
tokio::fs::remove_file(&local_layer_path).await?;
|
||||
Ok(stats)
|
||||
}
|
||||
let semaphore = semaphore.clone();
|
||||
let file_key = file_key.to_owned();
|
||||
let storage = storage.clone();
|
||||
let tmp_dir = tmp_dir.to_owned();
|
||||
let file_name = file_name.to_owned();
|
||||
let percent = (file_idx * 100) as f64 / files_list.keys.len() as f64;
|
||||
tasks.spawn(async move {
|
||||
let stats = stats(
|
||||
semaphore,
|
||||
local_layer_path.to_owned(),
|
||||
json_file_path.to_owned(),
|
||||
tmp_dir,
|
||||
storage,
|
||||
file_key,
|
||||
)
|
||||
.await;
|
||||
match stats {
|
||||
Ok(stats) => {
|
||||
println!("Statistics for {file_name} ({percent:.1}%): {stats:?}\n")
|
||||
}
|
||||
Err(e) => eprintln!("Error for {file_name}: {e:?}"),
|
||||
};
|
||||
});
|
||||
}
|
||||
while let Some(_res) = tasks.join_next().await {}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
//! See also `settings.md` for better description on every parameter.
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_api::{models::ImageCompressionAlgorithm, shard::TenantShardId};
|
||||
use remote_storage::{RemotePath, RemoteStorageConfig};
|
||||
use serde;
|
||||
use serde::de::IntoDeserializer;
|
||||
@@ -55,6 +55,7 @@ pub mod defaults {
|
||||
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
|
||||
DEFAULT_PG_LISTEN_PORT,
|
||||
};
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT;
|
||||
|
||||
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
|
||||
@@ -95,6 +96,8 @@ pub mod defaults {
|
||||
|
||||
pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB
|
||||
|
||||
pub const DEFAULT_IMAGE_COMPRESSION: Option<ImageCompressionAlgorithm> = None;
|
||||
|
||||
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;
|
||||
|
||||
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
|
||||
@@ -290,6 +293,8 @@ pub struct PageServerConf {
|
||||
|
||||
pub validate_vectored_get: bool,
|
||||
|
||||
pub image_compression: Option<ImageCompressionAlgorithm>,
|
||||
|
||||
/// How many bytes of ephemeral layer content will we allow per kilobyte of RAM. When this
|
||||
/// is exceeded, we start proactively closing ephemeral layers to limit the total amount
|
||||
/// of ephemeral data.
|
||||
@@ -400,6 +405,8 @@ struct PageServerConfigBuilder {
|
||||
|
||||
validate_vectored_get: BuilderValue<bool>,
|
||||
|
||||
image_compression: BuilderValue<Option<ImageCompressionAlgorithm>>,
|
||||
|
||||
ephemeral_bytes_per_memory_kb: BuilderValue<usize>,
|
||||
}
|
||||
|
||||
@@ -487,6 +494,7 @@ impl PageServerConfigBuilder {
|
||||
max_vectored_read_bytes: Set(MaxVectoredReadBytes(
|
||||
NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(),
|
||||
)),
|
||||
image_compression: Set(DEFAULT_IMAGE_COMPRESSION),
|
||||
validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
|
||||
ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
|
||||
}
|
||||
@@ -672,6 +680,10 @@ impl PageServerConfigBuilder {
|
||||
self.validate_vectored_get = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn get_image_compression(&mut self, value: Option<ImageCompressionAlgorithm>) {
|
||||
self.image_compression = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn get_ephemeral_bytes_per_memory_kb(&mut self, value: usize) {
|
||||
self.ephemeral_bytes_per_memory_kb = BuilderValue::Set(value);
|
||||
}
|
||||
@@ -732,6 +744,7 @@ impl PageServerConfigBuilder {
|
||||
get_impl,
|
||||
max_vectored_read_bytes,
|
||||
validate_vectored_get,
|
||||
image_compression,
|
||||
ephemeral_bytes_per_memory_kb,
|
||||
}
|
||||
CUSTOM LOGIC
|
||||
@@ -1026,6 +1039,9 @@ impl PageServerConf {
|
||||
"validate_vectored_get" => {
|
||||
builder.get_validate_vectored_get(parse_toml_bool("validate_vectored_get", item)?)
|
||||
}
|
||||
"image_compression" => {
|
||||
builder.get_image_compression(Some(parse_toml_from_str("image_compression", item)?))
|
||||
}
|
||||
"ephemeral_bytes_per_memory_kb" => {
|
||||
builder.get_ephemeral_bytes_per_memory_kb(parse_toml_u64("ephemeral_bytes_per_memory_kb", item)? as usize)
|
||||
}
|
||||
@@ -1110,6 +1126,7 @@ impl PageServerConf {
|
||||
NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
|
||||
.expect("Invalid default constant"),
|
||||
),
|
||||
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
|
||||
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
|
||||
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
|
||||
}
|
||||
@@ -1350,6 +1367,7 @@ background_task_maximum_delay = '334 s'
|
||||
.expect("Invalid default constant")
|
||||
),
|
||||
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
|
||||
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
|
||||
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
|
||||
},
|
||||
"Correct defaults should be used when no config values are provided"
|
||||
@@ -1423,6 +1441,7 @@ background_task_maximum_delay = '334 s'
|
||||
.expect("Invalid default constant")
|
||||
),
|
||||
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
|
||||
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
|
||||
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
|
||||
},
|
||||
"Should be able to parse all basic config values correctly"
|
||||
|
||||
@@ -2,10 +2,9 @@
|
||||
//! and push them to a HTTP endpoint.
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
|
||||
use crate::tenant::size::CalculateSyntheticSizeError;
|
||||
use crate::tenant::tasks::BackgroundLoopKind;
|
||||
use crate::tenant::{
|
||||
mgr::TenantManager, LogicalSizeCalculationCause, PageReconstructError, Tenant,
|
||||
};
|
||||
use crate::tenant::{mgr::TenantManager, LogicalSizeCalculationCause, Tenant};
|
||||
use camino::Utf8PathBuf;
|
||||
use consumption_metrics::EventType;
|
||||
use pageserver_api::models::TenantState;
|
||||
@@ -350,19 +349,12 @@ async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &Re
|
||||
// Same for the loop that fetches computed metrics.
|
||||
// By using the same limiter, we centralize metrics collection for "start" and "finished" counters,
|
||||
// which turns out is really handy to understand the system.
|
||||
let Err(e) = tenant.calculate_synthetic_size(CAUSE, cancel, ctx).await else {
|
||||
return;
|
||||
};
|
||||
|
||||
// this error can be returned if timeline is shutting down, but it does not
|
||||
// mean the synthetic size worker should terminate.
|
||||
let shutting_down = matches!(
|
||||
e.downcast_ref::<PageReconstructError>(),
|
||||
Some(PageReconstructError::Cancelled)
|
||||
);
|
||||
|
||||
if !shutting_down {
|
||||
let tenant_shard_id = tenant.tenant_shard_id();
|
||||
error!("failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}");
|
||||
match tenant.calculate_synthetic_size(CAUSE, cancel, ctx).await {
|
||||
Ok(_) => {}
|
||||
Err(CalculateSyntheticSizeError::Cancelled) => {}
|
||||
Err(e) => {
|
||||
let tenant_shard_id = tenant.tenant_shard_id();
|
||||
error!("failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1135,7 +1135,10 @@ async fn tenant_size_handler(
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
.map_err(|e| match e {
|
||||
crate::tenant::size::CalculateSyntheticSizeError::Cancelled => ApiError::ShuttingDown,
|
||||
other => ApiError::InternalServerError(anyhow::anyhow!(other)),
|
||||
})?;
|
||||
|
||||
let mut sizes = None;
|
||||
let accepts_html = headers
|
||||
@@ -1143,9 +1146,7 @@ async fn tenant_size_handler(
|
||||
.map(|v| v == "text/html")
|
||||
.unwrap_or_default();
|
||||
if !inputs_only.unwrap_or(false) {
|
||||
let storage_model = inputs
|
||||
.calculate_model()
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
let storage_model = inputs.calculate_model();
|
||||
let size = storage_model.calculate();
|
||||
|
||||
// If request header expects html, return html
|
||||
|
||||
@@ -509,11 +509,24 @@ pub(crate) enum GcError {
|
||||
#[error(transparent)]
|
||||
Remote(anyhow::Error),
|
||||
|
||||
// An error reading while calculating GC cutoffs
|
||||
#[error(transparent)]
|
||||
GcCutoffs(PageReconstructError),
|
||||
|
||||
// If GC was invoked for a particular timeline, this error means it didn't exist
|
||||
#[error("timeline not found")]
|
||||
TimelineNotFound,
|
||||
}
|
||||
|
||||
impl From<PageReconstructError> for GcError {
|
||||
fn from(value: PageReconstructError) -> Self {
|
||||
match value {
|
||||
PageReconstructError::Cancelled => Self::TimelineCancelled,
|
||||
other => Self::GcCutoffs(other),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Tenant {
|
||||
/// Yet another helper for timeline initialization.
|
||||
///
|
||||
@@ -1033,7 +1046,6 @@ impl Tenant {
|
||||
remote_metadata,
|
||||
TimelineResources {
|
||||
remote_client,
|
||||
deletion_queue_client: self.deletion_queue_client.clone(),
|
||||
timeline_get_throttle: self.timeline_get_throttle.clone(),
|
||||
},
|
||||
ctx,
|
||||
@@ -1059,7 +1071,6 @@ impl Tenant {
|
||||
timeline_id,
|
||||
&index_part.metadata,
|
||||
remote_timeline_client,
|
||||
self.deletion_queue_client.clone(),
|
||||
)
|
||||
.instrument(tracing::info_span!("timeline_delete", %timeline_id))
|
||||
.await
|
||||
@@ -2921,17 +2932,9 @@ impl Tenant {
|
||||
.checked_sub(horizon)
|
||||
.unwrap_or(Lsn(0));
|
||||
|
||||
let res = timeline.find_gc_cutoffs(cutoff, pitr, cancel, ctx).await;
|
||||
|
||||
match res {
|
||||
Ok(cutoffs) => {
|
||||
let old = gc_cutoffs.insert(timeline.timeline_id, cutoffs);
|
||||
assert!(old.is_none());
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(timeline_id = %timeline.timeline_id, "ignoring failure to find gc cutoffs: {e:#}");
|
||||
}
|
||||
}
|
||||
let cutoffs = timeline.find_gc_cutoffs(cutoff, pitr, cancel, ctx).await?;
|
||||
let old = gc_cutoffs.insert(timeline.timeline_id, cutoffs);
|
||||
assert!(old.is_none());
|
||||
}
|
||||
|
||||
if !self.is_active() || self.cancel.is_cancelled() {
|
||||
@@ -3443,7 +3446,6 @@ impl Tenant {
|
||||
);
|
||||
TimelineResources {
|
||||
remote_client,
|
||||
deletion_queue_client: self.deletion_queue_client.clone(),
|
||||
timeline_get_throttle: self.timeline_get_throttle.clone(),
|
||||
}
|
||||
}
|
||||
@@ -3553,7 +3555,7 @@ impl Tenant {
|
||||
cause: LogicalSizeCalculationCause,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<size::ModelInputs> {
|
||||
) -> Result<size::ModelInputs, size::CalculateSyntheticSizeError> {
|
||||
let logical_sizes_at_once = self
|
||||
.conf
|
||||
.concurrent_tenant_size_logical_size_queries
|
||||
@@ -3568,8 +3570,8 @@ impl Tenant {
|
||||
// See more for on the issue #2748 condenced out of the initial PR review.
|
||||
let mut shared_cache = tokio::select! {
|
||||
locked = self.cached_logical_sizes.lock() => locked,
|
||||
_ = cancel.cancelled() => anyhow::bail!("cancelled"),
|
||||
_ = self.cancel.cancelled() => anyhow::bail!("tenant is shutting down"),
|
||||
_ = cancel.cancelled() => return Err(size::CalculateSyntheticSizeError::Cancelled),
|
||||
_ = self.cancel.cancelled() => return Err(size::CalculateSyntheticSizeError::Cancelled),
|
||||
};
|
||||
|
||||
size::gather_inputs(
|
||||
@@ -3593,10 +3595,10 @@ impl Tenant {
|
||||
cause: LogicalSizeCalculationCause,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<u64> {
|
||||
) -> Result<u64, size::CalculateSyntheticSizeError> {
|
||||
let inputs = self.gather_size_inputs(None, cause, cancel, ctx).await?;
|
||||
|
||||
let size = inputs.calculate()?;
|
||||
let size = inputs.calculate();
|
||||
|
||||
self.set_cached_synthetic_size(size);
|
||||
|
||||
@@ -3831,8 +3833,6 @@ pub(crate) mod harness {
|
||||
tenant_conf.image_layer_creation_check_threshold,
|
||||
),
|
||||
switch_aux_file_policy: Some(tenant_conf.switch_aux_file_policy),
|
||||
#[cfg(feature = "testing")]
|
||||
test_vm_bit_debug_logging: Some(tenant_conf.test_vm_bit_debug_logging),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4043,6 +4043,7 @@ mod tests {
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::tenant::harness::*;
|
||||
use crate::tenant::timeline::CompactFlags;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use crate::DEFAULT_PG_VERSION;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use hex_literal::hex;
|
||||
@@ -6366,7 +6367,7 @@ mod tests {
|
||||
.await?;
|
||||
Ok(res.pop_last().map(|(k, v)| {
|
||||
assert_eq!(k, key);
|
||||
v.unwrap().0
|
||||
v.unwrap()
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -6707,8 +6708,8 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_simple_bottom_most_compaction() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_simple_bottom_most_compaction")?;
|
||||
async fn test_simple_bottom_most_compaction_images() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_simple_bottom_most_compaction_images")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
fn get_key(id: u32) -> Key {
|
||||
@@ -6863,4 +6864,79 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_neon_test_record() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_neon_test_record")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
fn get_key(id: u32) -> Key {
|
||||
// using aux key here b/c they are guaranteed to be inside `collect_keyspace`.
|
||||
let mut key = Key::from_hex("620000000033333333444444445500000000").unwrap();
|
||||
key.field6 = id;
|
||||
key
|
||||
}
|
||||
|
||||
let delta1 = vec![
|
||||
(
|
||||
get_key(1),
|
||||
Lsn(0x20),
|
||||
Value::WalRecord(NeonWalRecord::wal_append(",0x20")),
|
||||
),
|
||||
(
|
||||
get_key(1),
|
||||
Lsn(0x30),
|
||||
Value::WalRecord(NeonWalRecord::wal_append(",0x30")),
|
||||
),
|
||||
(get_key(2), Lsn(0x10), Value::Image("0x10".into())),
|
||||
(
|
||||
get_key(2),
|
||||
Lsn(0x20),
|
||||
Value::WalRecord(NeonWalRecord::wal_append(",0x20")),
|
||||
),
|
||||
(
|
||||
get_key(2),
|
||||
Lsn(0x30),
|
||||
Value::WalRecord(NeonWalRecord::wal_append(",0x30")),
|
||||
),
|
||||
(get_key(3), Lsn(0x10), Value::Image("0x10".into())),
|
||||
(
|
||||
get_key(3),
|
||||
Lsn(0x20),
|
||||
Value::WalRecord(NeonWalRecord::wal_clear()),
|
||||
),
|
||||
(get_key(4), Lsn(0x10), Value::Image("0x10".into())),
|
||||
(
|
||||
get_key(4),
|
||||
Lsn(0x20),
|
||||
Value::WalRecord(NeonWalRecord::wal_init()),
|
||||
),
|
||||
];
|
||||
let image1 = vec![(get_key(1), "0x10".into())];
|
||||
|
||||
let tline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
TIMELINE_ID,
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
vec![delta1], // delta layers
|
||||
vec![(Lsn(0x10), image1)], // image layers
|
||||
Lsn(0x50),
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert_eq!(
|
||||
tline.get(get_key(1), Lsn(0x50), &ctx).await?,
|
||||
Bytes::from_static(b"0x10,0x20,0x30")
|
||||
);
|
||||
assert_eq!(
|
||||
tline.get(get_key(2), Lsn(0x50), &ctx).await?,
|
||||
Bytes::from_static(b"0x10,0x20,0x30")
|
||||
);
|
||||
// assert_eq!(tline.get(get_key(3), Lsn(0x50), &ctx).await?, Bytes::new());
|
||||
// assert_eq!(tline.get(get_key(4), Lsn(0x50), &ctx).await?, Bytes::new());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,7 +11,10 @@
|
||||
//! len < 128: 0XXXXXXX
|
||||
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
|
||||
//!
|
||||
use async_compression::Level;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
|
||||
|
||||
use crate::context::RequestContext;
|
||||
@@ -66,12 +69,29 @@ impl<'a> BlockCursor<'a> {
|
||||
len_buf.copy_from_slice(&buf[off..off + 4]);
|
||||
off += 4;
|
||||
}
|
||||
len_buf[0] &= 0x7f;
|
||||
len_buf[0] &= 0x0f;
|
||||
u32::from_be_bytes(len_buf) as usize
|
||||
};
|
||||
let compression_bits = first_len_byte & 0xf0;
|
||||
|
||||
dstbuf.clear();
|
||||
dstbuf.reserve(len);
|
||||
let mut tmp_buf = Vec::new();
|
||||
let buf_to_write;
|
||||
let compression = if compression_bits <= BYTE_UNCOMPRESSED {
|
||||
buf_to_write = dstbuf;
|
||||
None
|
||||
} else if compression_bits == BYTE_ZSTD || compression_bits == BYTE_LZ4 {
|
||||
buf_to_write = &mut tmp_buf;
|
||||
Some(dstbuf)
|
||||
} else {
|
||||
let error = std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("invalid compression byte {compression_bits:x}"),
|
||||
);
|
||||
return Err(error);
|
||||
};
|
||||
|
||||
buf_to_write.clear();
|
||||
buf_to_write.reserve(len);
|
||||
|
||||
// Read the payload
|
||||
let mut remain = len;
|
||||
@@ -85,14 +105,38 @@ impl<'a> BlockCursor<'a> {
|
||||
page_remain = PAGE_SZ;
|
||||
}
|
||||
let this_blk_len = min(remain, page_remain);
|
||||
dstbuf.extend_from_slice(&buf[off..off + this_blk_len]);
|
||||
buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]);
|
||||
remain -= this_blk_len;
|
||||
off += this_blk_len;
|
||||
}
|
||||
|
||||
if let Some(dstbuf) = compression {
|
||||
if compression_bits == BYTE_ZSTD {
|
||||
let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf);
|
||||
decoder.write_all(buf_to_write).await?;
|
||||
decoder.flush().await?;
|
||||
} else if compression_bits == BYTE_LZ4 {
|
||||
let decompressed = lz4_flex::block::decompress_size_prepended(&buf_to_write)
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("lz4 decompression error: {e:?}"),
|
||||
)
|
||||
})?;
|
||||
dstbuf.extend_from_slice(&decompressed);
|
||||
} else {
|
||||
unreachable!("already checked above")
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
const BYTE_UNCOMPRESSED: u8 = 0x80;
|
||||
const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
|
||||
const BYTE_LZ4: u8 = BYTE_UNCOMPRESSED | 0x20;
|
||||
|
||||
/// A wrapper of `VirtualFile` that allows users to write blobs.
|
||||
///
|
||||
/// If a `BlobWriter` is dropped, the internal buffer will be
|
||||
@@ -219,6 +263,17 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
&mut self,
|
||||
srcbuf: B,
|
||||
ctx: &RequestContext,
|
||||
) -> (B::Buf, Result<u64, Error>) {
|
||||
self.write_blob_compressed(srcbuf, ctx, None).await
|
||||
}
|
||||
|
||||
/// Write a blob of data. Returns the offset that it was written to,
|
||||
/// which can be used to retrieve the data later.
|
||||
pub async fn write_blob_compressed<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
srcbuf: B,
|
||||
ctx: &RequestContext,
|
||||
algorithm: Option<ImageCompressionAlgorithm>,
|
||||
) -> (B::Buf, Result<u64, Error>) {
|
||||
let offset = self.offset;
|
||||
|
||||
@@ -226,29 +281,75 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
|
||||
let mut io_buf = self.io_buf.take().expect("we always put it back below");
|
||||
io_buf.clear();
|
||||
let (io_buf, hdr_res) = async {
|
||||
let mut compressed_buf = None;
|
||||
let ((io_buf, hdr_res), srcbuf) = async {
|
||||
if len < 128 {
|
||||
// Short blob. Write a 1-byte length header
|
||||
io_buf.put_u8(len as u8);
|
||||
self.write_all(io_buf, ctx).await
|
||||
(
|
||||
self.write_all(io_buf, ctx).await,
|
||||
srcbuf.slice(..).into_inner(),
|
||||
)
|
||||
} else {
|
||||
// Write a 4-byte length header
|
||||
if len > 0x7fff_ffff {
|
||||
if len > 0x0fff_ffff {
|
||||
return (
|
||||
io_buf,
|
||||
Err(Error::new(
|
||||
ErrorKind::Other,
|
||||
format!("blob too large ({len} bytes)"),
|
||||
)),
|
||||
(
|
||||
io_buf,
|
||||
Err(Error::new(
|
||||
ErrorKind::Other,
|
||||
format!("blob too large ({len} bytes)"),
|
||||
)),
|
||||
),
|
||||
srcbuf.slice(..).into_inner(),
|
||||
);
|
||||
}
|
||||
if len > 0x0fff_ffff {
|
||||
tracing::warn!("writing blob above future limit ({len} bytes)");
|
||||
}
|
||||
let mut len_buf = (len as u32).to_be_bytes();
|
||||
len_buf[0] |= 0x80;
|
||||
use ImageCompressionAlgorithm::*;
|
||||
let (high_bit_mask, len_written, srcbuf) = match algorithm {
|
||||
Some(ZstdLow | Zstd | ZstdHigh) => {
|
||||
let mut encoder = if matches!(algorithm, Some(ZstdLow)) {
|
||||
async_compression::tokio::write::ZstdEncoder::with_quality(
|
||||
Vec::new(),
|
||||
Level::Precise(1),
|
||||
)
|
||||
} else if matches!(algorithm, Some(ZstdHigh)) {
|
||||
async_compression::tokio::write::ZstdEncoder::with_quality(
|
||||
Vec::new(),
|
||||
Level::Precise(6),
|
||||
)
|
||||
} else {
|
||||
async_compression::tokio::write::ZstdEncoder::new(Vec::new())
|
||||
};
|
||||
let slice = srcbuf.slice(..);
|
||||
encoder.write_all(&slice[..]).await.unwrap();
|
||||
encoder.shutdown().await.unwrap();
|
||||
let compressed = encoder.into_inner();
|
||||
if compressed.len() < len {
|
||||
let compressed_len = compressed.len();
|
||||
compressed_buf = Some(compressed);
|
||||
(BYTE_ZSTD, compressed_len, slice.into_inner())
|
||||
} else {
|
||||
(BYTE_UNCOMPRESSED, len, slice.into_inner())
|
||||
}
|
||||
}
|
||||
Some(ImageCompressionAlgorithm::LZ4) => {
|
||||
let slice = srcbuf.slice(..);
|
||||
let compressed = lz4_flex::block::compress_prepend_size(&slice[..]);
|
||||
if compressed.len() < len {
|
||||
let compressed_len = compressed.len();
|
||||
compressed_buf = Some(compressed);
|
||||
(BYTE_LZ4, compressed_len, slice.into_inner())
|
||||
} else {
|
||||
(BYTE_UNCOMPRESSED, len, slice.into_inner())
|
||||
}
|
||||
}
|
||||
None => (BYTE_UNCOMPRESSED, len, srcbuf.slice(..).into_inner()),
|
||||
};
|
||||
let mut len_buf = (len_written as u32).to_be_bytes();
|
||||
assert_eq!(len_buf[0] & 0xf0, 0);
|
||||
len_buf[0] |= high_bit_mask;
|
||||
io_buf.extend_from_slice(&len_buf[..]);
|
||||
self.write_all(io_buf, ctx).await
|
||||
(self.write_all(io_buf, ctx).await, srcbuf)
|
||||
}
|
||||
}
|
||||
.await;
|
||||
@@ -257,7 +358,12 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
Ok(_) => (),
|
||||
Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
|
||||
}
|
||||
let (srcbuf, res) = self.write_all(srcbuf, ctx).await;
|
||||
let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
|
||||
let (_buf, res) = self.write_all(compressed_buf, ctx).await;
|
||||
(Slice::into_inner(srcbuf.slice(..)), res)
|
||||
} else {
|
||||
self.write_all(srcbuf, ctx).await
|
||||
};
|
||||
(srcbuf, res.map(|_| offset))
|
||||
}
|
||||
}
|
||||
@@ -295,6 +401,12 @@ mod tests {
|
||||
use rand::{Rng, SeedableRng};
|
||||
|
||||
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
|
||||
round_trip_test_compressed::<BUFFERED, 0>(blobs).await
|
||||
}
|
||||
|
||||
async fn round_trip_test_compressed<const BUFFERED: bool, const COMPRESSION: u8>(
|
||||
blobs: &[Vec<u8>],
|
||||
) -> Result<(), Error> {
|
||||
let temp_dir = camino_tempfile::tempdir()?;
|
||||
let pathbuf = temp_dir.path().join("file");
|
||||
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
|
||||
@@ -305,7 +417,26 @@ mod tests {
|
||||
let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?;
|
||||
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
|
||||
for blob in blobs.iter() {
|
||||
let (_, res) = wtr.write_blob(blob.clone(), &ctx).await;
|
||||
let (_, res) = match COMPRESSION {
|
||||
0 => wtr.write_blob(blob.clone(), &ctx).await,
|
||||
1 => {
|
||||
wtr.write_blob_compressed(
|
||||
blob.clone(),
|
||||
&ctx,
|
||||
Some(ImageCompressionAlgorithm::ZstdLow),
|
||||
)
|
||||
.await
|
||||
}
|
||||
2 => {
|
||||
wtr.write_blob_compressed(
|
||||
blob.clone(),
|
||||
&ctx,
|
||||
Some(ImageCompressionAlgorithm::LZ4),
|
||||
)
|
||||
.await
|
||||
}
|
||||
_ => unreachable!("Invalid compression {COMPRESSION}"),
|
||||
};
|
||||
let offs = res?;
|
||||
offsets.push(offs);
|
||||
}
|
||||
@@ -361,10 +492,17 @@ mod tests {
|
||||
let blobs = &[
|
||||
b"test".to_vec(),
|
||||
random_array(10 * PAGE_SZ),
|
||||
b"hello".to_vec(),
|
||||
random_array(66 * PAGE_SZ),
|
||||
vec![0xf3; 24 * PAGE_SZ],
|
||||
b"foobar".to_vec(),
|
||||
];
|
||||
round_trip_test::<false>(blobs).await?;
|
||||
round_trip_test::<true>(blobs).await?;
|
||||
round_trip_test_compressed::<false, 1>(blobs).await?;
|
||||
round_trip_test_compressed::<true, 1>(blobs).await?;
|
||||
round_trip_test_compressed::<false, 2>(blobs).await?;
|
||||
round_trip_test_compressed::<true, 2>(blobs).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -377,9 +377,6 @@ pub struct TenantConf {
|
||||
/// There is a `last_aux_file_policy` flag which gets persisted in `index_part.json` once the first aux
|
||||
/// file is written.
|
||||
pub switch_aux_file_policy: AuxFilePolicy,
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
pub test_vm_bit_debug_logging: bool,
|
||||
}
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
@@ -479,11 +476,6 @@ pub struct TenantConfOpt {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub switch_aux_file_policy: Option<AuxFilePolicy>,
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub test_vm_bit_debug_logging: Option<bool>,
|
||||
}
|
||||
|
||||
impl TenantConfOpt {
|
||||
@@ -546,10 +538,6 @@ impl TenantConfOpt {
|
||||
switch_aux_file_policy: self
|
||||
.switch_aux_file_policy
|
||||
.unwrap_or(global_conf.switch_aux_file_policy),
|
||||
#[cfg(feature = "testing")]
|
||||
test_vm_bit_debug_logging: self
|
||||
.test_vm_bit_debug_logging
|
||||
.unwrap_or(global_conf.test_vm_bit_debug_logging),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -594,8 +582,6 @@ impl Default for TenantConf {
|
||||
timeline_get_throttle: crate::tenant::throttle::Config::disabled(),
|
||||
image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD,
|
||||
switch_aux_file_policy: AuxFilePolicy::default_tenant_config(),
|
||||
#[cfg(feature = "testing")]
|
||||
test_vm_bit_debug_logging: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -671,8 +657,6 @@ impl From<TenantConfOpt> for models::TenantConfig {
|
||||
timeline_get_throttle: value.timeline_get_throttle.map(ThrottleConfig::from),
|
||||
image_layer_creation_check_threshold: value.image_layer_creation_check_threshold,
|
||||
switch_aux_file_policy: value.switch_aux_file_policy,
|
||||
#[cfg(feature = "testing")]
|
||||
test_vm_bit_debug_logging: value.test_vm_bit_debug_logging,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -513,7 +513,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
// cover our access to local storage.
|
||||
let Ok(_guard) = self.secondary_state.gate.enter() else {
|
||||
// Shutting down
|
||||
return Ok(());
|
||||
return Err(UpdateError::Cancelled);
|
||||
};
|
||||
|
||||
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
|
||||
@@ -846,7 +846,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
for layer in timeline.layers {
|
||||
if self.secondary_state.cancel.is_cancelled() {
|
||||
tracing::debug!("Cancelled -- dropping out of layer loop");
|
||||
return Ok(());
|
||||
return Err(UpdateError::Cancelled);
|
||||
}
|
||||
|
||||
// Existing on-disk layers: just update their access time.
|
||||
|
||||
@@ -3,7 +3,6 @@ use std::collections::hash_map::Entry;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use tokio::sync::oneshot::error::RecvError;
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -11,7 +10,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use crate::context::RequestContext;
|
||||
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
|
||||
|
||||
use super::{LogicalSizeCalculationCause, Tenant};
|
||||
use super::{GcError, LogicalSizeCalculationCause, Tenant};
|
||||
use crate::tenant::Timeline;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -43,6 +42,44 @@ pub struct SegmentMeta {
|
||||
pub kind: LsnKind,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum CalculateSyntheticSizeError {
|
||||
/// Something went wrong internally to the calculation of logical size at a particular branch point
|
||||
#[error("Failed to calculated logical size on timeline {timeline_id} at {lsn}: {error}")]
|
||||
LogicalSize {
|
||||
timeline_id: TimelineId,
|
||||
lsn: Lsn,
|
||||
error: CalculateLogicalSizeError,
|
||||
},
|
||||
|
||||
/// Something went wrong internally when calculating GC parameters at start of size calculation
|
||||
#[error(transparent)]
|
||||
GcInfo(GcError),
|
||||
|
||||
/// Totally unexpected errors, like panics joining a task
|
||||
#[error(transparent)]
|
||||
Fatal(anyhow::Error),
|
||||
|
||||
/// The LSN we are trying to calculate a size at no longer exists at the point we query it
|
||||
#[error("Could not find size at {lsn} in timeline {timeline_id}")]
|
||||
LsnNotFound { timeline_id: TimelineId, lsn: Lsn },
|
||||
|
||||
/// Tenant shut down while calculating size
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
impl From<GcError> for CalculateSyntheticSizeError {
|
||||
fn from(value: GcError) -> Self {
|
||||
match value {
|
||||
GcError::TenantCancelled | GcError::TimelineCancelled => {
|
||||
CalculateSyntheticSizeError::Cancelled
|
||||
}
|
||||
other => CalculateSyntheticSizeError::GcInfo(other),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentMeta {
|
||||
fn size_needed(&self) -> bool {
|
||||
match self.kind {
|
||||
@@ -116,12 +153,9 @@ pub(super) async fn gather_inputs(
|
||||
cause: LogicalSizeCalculationCause,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ModelInputs> {
|
||||
) -> Result<ModelInputs, CalculateSyntheticSizeError> {
|
||||
// refresh is needed to update gc related pitr_cutoff and horizon_cutoff
|
||||
tenant
|
||||
.refresh_gc_info(cancel, ctx)
|
||||
.await
|
||||
.context("Failed to refresh gc_info before gathering inputs")?;
|
||||
tenant.refresh_gc_info(cancel, ctx).await?;
|
||||
|
||||
// Collect information about all the timelines
|
||||
let mut timelines = tenant.list_timelines();
|
||||
@@ -327,6 +361,12 @@ pub(super) async fn gather_inputs(
|
||||
)
|
||||
.await?;
|
||||
|
||||
if tenant.cancel.is_cancelled() {
|
||||
// If we're shutting down, return an error rather than a sparse result that might include some
|
||||
// timelines from before we started shutting down
|
||||
return Err(CalculateSyntheticSizeError::Cancelled);
|
||||
}
|
||||
|
||||
Ok(ModelInputs {
|
||||
segments,
|
||||
timeline_inputs,
|
||||
@@ -345,7 +385,7 @@ async fn fill_logical_sizes(
|
||||
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), CalculateSyntheticSizeError> {
|
||||
let timeline_hash: HashMap<TimelineId, Arc<Timeline>> = HashMap::from_iter(
|
||||
timelines
|
||||
.iter()
|
||||
@@ -387,7 +427,7 @@ async fn fill_logical_sizes(
|
||||
}
|
||||
|
||||
// Perform the size lookups
|
||||
let mut have_any_error = false;
|
||||
let mut have_any_error = None;
|
||||
while let Some(res) = joinset.join_next().await {
|
||||
// each of these come with Result<anyhow::Result<_>, JoinError>
|
||||
// because of spawn + spawn_blocking
|
||||
@@ -398,21 +438,36 @@ async fn fill_logical_sizes(
|
||||
Err(join_error) => {
|
||||
// cannot really do anything, as this panic is likely a bug
|
||||
error!("task that calls spawn_ondemand_logical_size_calculation panicked: {join_error:#}");
|
||||
have_any_error = true;
|
||||
|
||||
have_any_error = Some(CalculateSyntheticSizeError::Fatal(
|
||||
anyhow::anyhow!(join_error)
|
||||
.context("task that calls spawn_ondemand_logical_size_calculation"),
|
||||
));
|
||||
}
|
||||
Ok(Err(recv_result_error)) => {
|
||||
// cannot really do anything, as this panic is likely a bug
|
||||
error!("failed to receive logical size query result: {recv_result_error:#}");
|
||||
have_any_error = true;
|
||||
have_any_error = Some(CalculateSyntheticSizeError::Fatal(
|
||||
anyhow::anyhow!(recv_result_error)
|
||||
.context("Receiving logical size query result"),
|
||||
));
|
||||
}
|
||||
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => {
|
||||
if !matches!(error, CalculateLogicalSizeError::Cancelled) {
|
||||
if matches!(error, CalculateLogicalSizeError::Cancelled) {
|
||||
// Skip this: it's okay if one timeline among many is shutting down while we
|
||||
// calculate inputs for the overall tenant.
|
||||
continue;
|
||||
} else {
|
||||
warn!(
|
||||
timeline_id=%timeline.timeline_id,
|
||||
"failed to calculate logical size at {lsn}: {error:#}"
|
||||
);
|
||||
have_any_error = Some(CalculateSyntheticSizeError::LogicalSize {
|
||||
timeline_id: timeline.timeline_id,
|
||||
lsn,
|
||||
error,
|
||||
});
|
||||
}
|
||||
have_any_error = true;
|
||||
}
|
||||
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => {
|
||||
debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated");
|
||||
@@ -426,10 +481,10 @@ async fn fill_logical_sizes(
|
||||
// prune any keys not needed anymore; we record every used key and added key.
|
||||
logical_size_cache.retain(|key, _| sizes_needed.contains_key(key));
|
||||
|
||||
if have_any_error {
|
||||
if let Some(error) = have_any_error {
|
||||
// we cannot complete this round, because we are missing data.
|
||||
// we have however cached all we were able to request calculation on.
|
||||
anyhow::bail!("failed to calculate some logical_sizes");
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
// Insert the looked up sizes to the Segments
|
||||
@@ -444,32 +499,29 @@ async fn fill_logical_sizes(
|
||||
if let Some(Some(size)) = sizes_needed.get(&(timeline_id, lsn)) {
|
||||
seg.segment.size = Some(*size);
|
||||
} else {
|
||||
bail!("could not find size at {} in timeline {}", lsn, timeline_id);
|
||||
return Err(CalculateSyntheticSizeError::LsnNotFound { timeline_id, lsn });
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl ModelInputs {
|
||||
pub fn calculate_model(&self) -> anyhow::Result<tenant_size_model::StorageModel> {
|
||||
pub fn calculate_model(&self) -> tenant_size_model::StorageModel {
|
||||
// Convert SegmentMetas into plain Segments
|
||||
let storage = StorageModel {
|
||||
StorageModel {
|
||||
segments: self
|
||||
.segments
|
||||
.iter()
|
||||
.map(|seg| seg.segment.clone())
|
||||
.collect(),
|
||||
};
|
||||
|
||||
Ok(storage)
|
||||
}
|
||||
}
|
||||
|
||||
// calculate total project size
|
||||
pub fn calculate(&self) -> anyhow::Result<u64> {
|
||||
let storage = self.calculate_model()?;
|
||||
pub fn calculate(&self) -> u64 {
|
||||
let storage = self.calculate_model();
|
||||
let sizes = storage.calculate();
|
||||
|
||||
Ok(sizes.total_size)
|
||||
sizes.total_size
|
||||
}
|
||||
}
|
||||
|
||||
@@ -656,7 +708,7 @@ fn verify_size_for_multiple_branches() {
|
||||
"#;
|
||||
let inputs: ModelInputs = serde_json::from_str(doc).unwrap();
|
||||
|
||||
assert_eq!(inputs.calculate().unwrap(), 37_851_408);
|
||||
assert_eq!(inputs.calculate(), 37_851_408);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -711,7 +763,7 @@ fn verify_size_for_one_branch() {
|
||||
|
||||
let model: ModelInputs = serde_json::from_str(doc).unwrap();
|
||||
|
||||
let res = model.calculate_model().unwrap().calculate();
|
||||
let res = model.calculate_model().calculate();
|
||||
|
||||
println!("calculated synthetic size: {}", res.total_size);
|
||||
println!("result: {:?}", serde_json::to_string(&res.segments));
|
||||
|
||||
@@ -73,7 +73,7 @@ where
|
||||
/// the same ValueReconstructState struct in the next 'get_value_reconstruct_data'
|
||||
/// call, to collect more records.
|
||||
///
|
||||
#[derive(Debug, Default, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ValueReconstructState {
|
||||
pub records: Vec<(Lsn, NeonWalRecord)>,
|
||||
pub img: Option<(Lsn, Bytes)>,
|
||||
@@ -318,7 +318,7 @@ pub(crate) struct LayerFringe {
|
||||
#[derive(Debug)]
|
||||
struct LayerKeyspace {
|
||||
layer: ReadableLayer,
|
||||
target_keyspace: Vec<KeySpace>,
|
||||
target_keyspace: KeySpaceRandomAccum,
|
||||
}
|
||||
|
||||
impl LayerFringe {
|
||||
@@ -342,17 +342,13 @@ impl LayerFringe {
|
||||
_,
|
||||
LayerKeyspace {
|
||||
layer,
|
||||
target_keyspace,
|
||||
mut target_keyspace,
|
||||
},
|
||||
)) => {
|
||||
let mut keyspace = KeySpaceRandomAccum::new();
|
||||
for ks in target_keyspace {
|
||||
for part in ks.ranges {
|
||||
keyspace.add_range(part);
|
||||
}
|
||||
}
|
||||
Some((layer, keyspace.consume_keyspace(), read_desc.lsn_range))
|
||||
}
|
||||
)) => Some((
|
||||
layer,
|
||||
target_keyspace.consume_keyspace(),
|
||||
read_desc.lsn_range,
|
||||
)),
|
||||
None => unreachable!("fringe internals are always consistent"),
|
||||
}
|
||||
}
|
||||
@@ -367,16 +363,18 @@ impl LayerFringe {
|
||||
let entry = self.layers.entry(layer_id.clone());
|
||||
match entry {
|
||||
Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().target_keyspace.push(keyspace);
|
||||
entry.get_mut().target_keyspace.add_keyspace(keyspace);
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
self.planned_reads_by_lsn.push(ReadDesc {
|
||||
lsn_range,
|
||||
layer_id: layer_id.clone(),
|
||||
});
|
||||
let mut accum = KeySpaceRandomAccum::new();
|
||||
accum.add_keyspace(keyspace);
|
||||
entry.insert(LayerKeyspace {
|
||||
layer,
|
||||
target_keyspace: vec![keyspace],
|
||||
target_keyspace: accum,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -219,7 +219,6 @@ pub struct DeltaLayerInner {
|
||||
// values copied from summary
|
||||
index_start_blk: u32,
|
||||
index_root_blk: u32,
|
||||
lsn_range: Range<Lsn>,
|
||||
|
||||
file: VirtualFile,
|
||||
file_id: FileId,
|
||||
@@ -785,7 +784,6 @@ impl DeltaLayerInner {
|
||||
file_id,
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
lsn_range: actual_summary.lsn_range,
|
||||
max_vectored_read_bytes,
|
||||
}))
|
||||
}
|
||||
@@ -822,7 +820,6 @@ impl DeltaLayerInner {
|
||||
if entry_lsn < lsn_range.start {
|
||||
return false;
|
||||
}
|
||||
assert!(entry_lsn <= search_key.lsn(), "certain because of how backwards visit direction works");
|
||||
offsets.push((entry_lsn, blob_ref.pos()));
|
||||
|
||||
!blob_ref.will_init()
|
||||
@@ -912,7 +909,7 @@ impl DeltaLayerInner {
|
||||
|
||||
let reads = Self::plan_reads(
|
||||
&keyspace,
|
||||
lsn_range,
|
||||
lsn_range.clone(),
|
||||
data_end_offset,
|
||||
index_reader,
|
||||
planner,
|
||||
@@ -925,7 +922,7 @@ impl DeltaLayerInner {
|
||||
self.do_reads_and_update_state(reads, reconstruct_state, ctx)
|
||||
.await;
|
||||
|
||||
reconstruct_state.on_lsn_advanced(&keyspace, self.lsn_range.start);
|
||||
reconstruct_state.on_lsn_advanced(&keyspace, lsn_range.start);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -46,17 +46,19 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hex;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::models::{ImageCompressionAlgorithm, LayerAccessKind};
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::File;
|
||||
use std::io::SeekFrom;
|
||||
use std::ops::Range;
|
||||
use std::os::unix::fs::MetadataExt;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio::time::Instant;
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::*;
|
||||
|
||||
@@ -366,6 +368,170 @@ impl ImageLayer {
|
||||
res?;
|
||||
Ok(())
|
||||
}
|
||||
pub async fn compression_statistics(
|
||||
dest_repo_path: &Utf8Path,
|
||||
path: &Utf8Path,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<(Option<ImageCompressionAlgorithm>, u64, u64, u64)>> {
|
||||
fn make_conf(
|
||||
image_compression: Option<ImageCompressionAlgorithm>,
|
||||
dest_repo_path: &Utf8Path,
|
||||
) -> &'static PageServerConf {
|
||||
let mut conf = PageServerConf::dummy_conf(dest_repo_path.to_owned());
|
||||
conf.image_compression = image_compression;
|
||||
Box::leak(Box::new(conf))
|
||||
}
|
||||
let image_compressions = [
|
||||
None,
|
||||
Some(ImageCompressionAlgorithm::ZstdLow),
|
||||
Some(ImageCompressionAlgorithm::Zstd),
|
||||
Some(ImageCompressionAlgorithm::ZstdHigh),
|
||||
Some(ImageCompressionAlgorithm::LZ4),
|
||||
];
|
||||
let confs = image_compressions
|
||||
.clone()
|
||||
.map(|compression| make_conf(compression, dest_repo_path));
|
||||
let mut stats = Vec::new();
|
||||
for (image_compression, conf) in image_compressions.into_iter().zip(confs) {
|
||||
let start_compression = Instant::now();
|
||||
let compressed_path = Self::compress_for_conf(path, ctx, conf).await?;
|
||||
let path_to_delete = compressed_path.clone();
|
||||
scopeguard::defer!({
|
||||
let _ = std::fs::remove_file(path_to_delete);
|
||||
});
|
||||
let size = path.metadata()?.size();
|
||||
let elapsed_ms = start_compression.elapsed().as_millis() as u64;
|
||||
let start_decompression = Instant::now();
|
||||
Self::compare_are_equal(path, &compressed_path, ctx, &image_compression).await?;
|
||||
let elapsed_decompression_ms = start_decompression.elapsed().as_millis() as u64;
|
||||
stats.push((
|
||||
image_compression,
|
||||
size,
|
||||
elapsed_ms,
|
||||
elapsed_decompression_ms,
|
||||
));
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
Ok(stats)
|
||||
}
|
||||
|
||||
async fn compress_for_conf(
|
||||
path: &Utf8Path,
|
||||
ctx: &RequestContext,
|
||||
conf: &'static PageServerConf,
|
||||
) -> anyhow::Result<Utf8PathBuf> {
|
||||
let file =
|
||||
VirtualFile::open_with_options(path, virtual_file::OpenOptions::new().read(true), ctx)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
|
||||
let file_id = page_cache::next_file_id();
|
||||
let block_reader = FileBlockReader::new(&file, file_id);
|
||||
let summary_blk = block_reader.read_blk(0, ctx).await?;
|
||||
let summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
|
||||
if summary.magic != IMAGE_FILE_MAGIC {
|
||||
anyhow::bail!("magic file mismatch");
|
||||
}
|
||||
|
||||
let tree_reader = DiskBtreeReader::new(
|
||||
summary.index_start_blk,
|
||||
summary.index_root_blk,
|
||||
&block_reader,
|
||||
);
|
||||
|
||||
let mut key_offset_stream =
|
||||
std::pin::pin!(tree_reader.get_stream_from(&[0u8; KEY_SIZE], ctx));
|
||||
|
||||
let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
|
||||
let timeline_path = conf.timeline_path(&tenant_shard_id, &summary.timeline_id);
|
||||
tokio::fs::create_dir_all(timeline_path).await?;
|
||||
|
||||
let mut writer = ImageLayerWriter::new(
|
||||
conf,
|
||||
summary.timeline_id,
|
||||
tenant_shard_id,
|
||||
&summary.key_range,
|
||||
summary.lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let cursor = block_reader.block_cursor();
|
||||
while let Some(r) = key_offset_stream.next().await {
|
||||
let (key, offset) = r?;
|
||||
let key = Key::from_slice(&key);
|
||||
let content = cursor.read_blob(offset, ctx).await?;
|
||||
writer.put_image(key, content.into(), ctx).await?;
|
||||
}
|
||||
let path = writer.inner.take().unwrap().finish_inner(ctx).await?.2;
|
||||
Ok(path)
|
||||
}
|
||||
|
||||
async fn compare_are_equal(
|
||||
path_a: &Utf8Path,
|
||||
path_b: &Utf8Path,
|
||||
ctx: &RequestContext,
|
||||
cmp: &Option<ImageCompressionAlgorithm>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut files = Vec::new();
|
||||
for path in [path_a, path_b] {
|
||||
let file = VirtualFile::open_with_options(
|
||||
path,
|
||||
virtual_file::OpenOptions::new().read(true),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
files.push(file);
|
||||
}
|
||||
|
||||
let mut readers_summaries = Vec::new();
|
||||
for file in files.iter() {
|
||||
let file_id = page_cache::next_file_id();
|
||||
let block_reader = FileBlockReader::new(&file, file_id);
|
||||
let summary_blk = block_reader.read_blk(0, ctx).await?;
|
||||
let summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
|
||||
if summary.magic != IMAGE_FILE_MAGIC {
|
||||
anyhow::bail!("magic file mismatch");
|
||||
}
|
||||
readers_summaries.push((block_reader, summary));
|
||||
}
|
||||
|
||||
let mut tree_readers_cursors = Vec::new();
|
||||
for (block_reader, summary) in readers_summaries.iter() {
|
||||
let tree_reader = DiskBtreeReader::new(
|
||||
summary.index_start_blk,
|
||||
summary.index_root_blk,
|
||||
block_reader,
|
||||
);
|
||||
let cursor = block_reader.block_cursor();
|
||||
tree_readers_cursors.push((tree_reader, cursor));
|
||||
}
|
||||
|
||||
let mut key_offset_stream_a = std::pin::pin!(tree_readers_cursors[0]
|
||||
.0
|
||||
.get_stream_from(&[0u8; KEY_SIZE], ctx));
|
||||
let mut key_offset_stream_b = std::pin::pin!(tree_readers_cursors[1]
|
||||
.0
|
||||
.get_stream_from(&[0u8; KEY_SIZE], ctx));
|
||||
while let Some(r) = key_offset_stream_a.next().await {
|
||||
let (key_a, offset_a): (Vec<u8>, _) = r?;
|
||||
let Some(r) = key_offset_stream_b.next().await else {
|
||||
panic!("second file at {path_b} has fewer keys than {path_a}");
|
||||
};
|
||||
let (key_b, offset_b): (Vec<u8>, _) = r?;
|
||||
assert_eq!(key_a, key_b, "mismatch of keys for {path_a}:{path_b}");
|
||||
let key = Key::from_slice(&key_a);
|
||||
let content_a = tree_readers_cursors[0].1.read_blob(offset_a, ctx).await?;
|
||||
let content_b = tree_readers_cursors[1].1.read_blob(offset_b, ctx).await?;
|
||||
assert_eq!(
|
||||
content_a, content_b,
|
||||
"mismatch for key={key} cmp={cmp:?} and {path_a}:{path_b}"
|
||||
);
|
||||
//println!("match for key={key} cmp={cmp:?} from {path_a}");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ImageLayerInner {
|
||||
@@ -782,7 +948,10 @@ impl ImageLayerWriterInner {
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
let (_img, res) = self.blob_writer.write_blob(img, ctx).await;
|
||||
let (_img, res) = self
|
||||
.blob_writer
|
||||
.write_blob_compressed(img, ctx, self.conf.image_compression)
|
||||
.await;
|
||||
// TODO: re-use the buffer for `img` further upstack
|
||||
let off = res?;
|
||||
|
||||
@@ -796,11 +965,10 @@ impl ImageLayerWriterInner {
|
||||
///
|
||||
/// Finish writing the image layer.
|
||||
///
|
||||
async fn finish(
|
||||
async fn finish_inner(
|
||||
self,
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ResidentLayer> {
|
||||
) -> anyhow::Result<(&'static PageServerConf, PersistentLayerDesc, Utf8PathBuf)> {
|
||||
let index_start_blk =
|
||||
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
|
||||
|
||||
@@ -854,8 +1022,16 @@ impl ImageLayerWriterInner {
|
||||
// fsync the file
|
||||
file.sync_all().await?;
|
||||
|
||||
Ok((self.conf, desc, self.path))
|
||||
}
|
||||
async fn finish(
|
||||
self,
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ResidentLayer> {
|
||||
let (conf, desc, path) = self.finish_inner(ctx).await?;
|
||||
// FIXME: why not carry the virtualfile here, it supports renaming?
|
||||
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
|
||||
let layer = Layer::finish_creating(conf, timeline, desc, &path)?;
|
||||
|
||||
info!("created image layer {}", layer.local_path());
|
||||
|
||||
@@ -923,6 +1099,12 @@ impl ImageLayerWriter {
|
||||
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
|
||||
}
|
||||
|
||||
/// Obtains the current size of the file
|
||||
pub(crate) fn size(&self) -> u64 {
|
||||
let inner = self.inner.as_ref().unwrap();
|
||||
inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
|
||||
}
|
||||
|
||||
///
|
||||
/// Finish writing the image layer.
|
||||
///
|
||||
|
||||
@@ -62,6 +62,7 @@ use std::{
|
||||
ops::ControlFlow,
|
||||
};
|
||||
|
||||
use crate::metrics::GetKind;
|
||||
use crate::pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS;
|
||||
use crate::{
|
||||
aux_file::AuxFileSizeEstimator,
|
||||
@@ -75,7 +76,6 @@ use crate::{
|
||||
disk_usage_eviction_task::DiskUsageEvictionInfo,
|
||||
pgdatadir_mapping::CollectKeySpaceError,
|
||||
};
|
||||
use crate::{deletion_queue::DeletionQueueClient, metrics::GetKind};
|
||||
use crate::{
|
||||
disk_usage_eviction_task::finite_f32,
|
||||
tenant::storage_layer::{
|
||||
@@ -205,7 +205,6 @@ fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
|
||||
/// The outward-facing resources required to build a Timeline
|
||||
pub struct TimelineResources {
|
||||
pub remote_client: RemoteTimelineClient,
|
||||
pub deletion_queue_client: DeletionQueueClient,
|
||||
pub timeline_get_throttle: Arc<
|
||||
crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
|
||||
>,
|
||||
@@ -910,9 +909,7 @@ impl Timeline {
|
||||
img: cached_page_img,
|
||||
};
|
||||
|
||||
self.get_impl(key, lsn, reconstruct_state, ctx)
|
||||
.await
|
||||
.map(|v| v.0)
|
||||
self.get_impl(key, lsn, reconstruct_state, ctx).await
|
||||
}
|
||||
GetImpl::Vectored => {
|
||||
let keyspace = KeySpace {
|
||||
@@ -924,59 +921,16 @@ impl Timeline {
|
||||
let mut reconstruct_state = ValuesReconstructState::new();
|
||||
|
||||
// Only add the cached image to the reconstruct state when it exists.
|
||||
let cached_page_img_lsn = cached_page_img.as_ref().map(|(lsn, _)| *lsn);
|
||||
if cached_page_img.is_some() {
|
||||
let mut key_state = VectoredValueReconstructState::default();
|
||||
key_state.img = cached_page_img;
|
||||
reconstruct_state.keys.insert(key, Ok(key_state));
|
||||
}
|
||||
|
||||
let debug_log = {
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
self.get_test_vm_bit_debug_logging()
|
||||
}
|
||||
#[cfg(not(feature = "testing"))]
|
||||
{
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
if debug_log {
|
||||
tracing::info!(%key, %lsn, ?cached_page_img_lsn, "debug-logging page reconstruction");
|
||||
}
|
||||
|
||||
if debug_log {
|
||||
tracing::info!(
|
||||
location = "before vectored get",
|
||||
"debug-logging page reconstruction"
|
||||
);
|
||||
self.layers
|
||||
.read()
|
||||
.await
|
||||
.layer_map()
|
||||
.dump(false, ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let vectored_res = self
|
||||
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
|
||||
.await;
|
||||
|
||||
if debug_log {
|
||||
tracing::info!(
|
||||
location = "before validation",
|
||||
"debug-logging page reconstruction"
|
||||
);
|
||||
self.layers
|
||||
.read()
|
||||
.await
|
||||
.layer_map()
|
||||
.dump(false, ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
if self.conf.validate_vectored_get {
|
||||
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
|
||||
.await;
|
||||
@@ -994,7 +948,7 @@ impl Timeline {
|
||||
"Singular vectored get returned wrong key"
|
||||
)))
|
||||
} else {
|
||||
value.map(|v| v.0)
|
||||
value
|
||||
}
|
||||
}
|
||||
None => Err(PageReconstructError::MissingKey(MissingKeyError {
|
||||
@@ -1018,7 +972,7 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
mut reconstruct_state: ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(Bytes, ValueReconstructState), PageReconstructError> {
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
// XXX: structured stats collection for layer eviction here.
|
||||
trace!(
|
||||
"get page request for {}@{} from task kind {:?}",
|
||||
@@ -1157,12 +1111,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
let Ok(res) = res else {
|
||||
return Err(res.unwrap_err());
|
||||
};
|
||||
Ok(BTreeMap::from_iter(
|
||||
res.into_iter().map(|(k, v)| (k, v.map(|v| v.0))),
|
||||
))
|
||||
res
|
||||
}
|
||||
|
||||
/// Scan the keyspace and return all existing key-values in the keyspace. This currently uses vectored
|
||||
@@ -1226,12 +1175,7 @@ impl Timeline {
|
||||
recording.observe(throttled);
|
||||
}
|
||||
|
||||
let Ok(vectored_res) = vectored_res else {
|
||||
return Err(vectored_res.unwrap_err());
|
||||
};
|
||||
Ok(BTreeMap::from_iter(
|
||||
vectored_res.into_iter().map(|(k, v)| (k, v.map(|v| v.0))),
|
||||
))
|
||||
vectored_res
|
||||
}
|
||||
|
||||
/// Not subject to [`Self::timeline_get_throttle`].
|
||||
@@ -1240,10 +1184,7 @@ impl Timeline {
|
||||
keyspace: KeySpace,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<
|
||||
BTreeMap<Key, Result<(Bytes, ValueReconstructState), PageReconstructError>>,
|
||||
GetVectoredError,
|
||||
> {
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
let mut values = BTreeMap::new();
|
||||
|
||||
for range in keyspace.ranges {
|
||||
@@ -1302,10 +1243,7 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<
|
||||
BTreeMap<Key, Result<(Bytes, ValueReconstructState), PageReconstructError>>,
|
||||
GetVectoredError,
|
||||
> {
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
let get_kind = if keyspace.total_raw_size() == 1 {
|
||||
GetKind::Singular
|
||||
} else {
|
||||
@@ -1322,10 +1260,7 @@ impl Timeline {
|
||||
let reconstruct_timer = crate::metrics::RECONSTRUCT_TIME
|
||||
.for_get_kind(get_kind)
|
||||
.start_timer();
|
||||
let mut results: BTreeMap<
|
||||
Key,
|
||||
Result<(Bytes, ValueReconstructState), PageReconstructError>,
|
||||
> = BTreeMap::new();
|
||||
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
|
||||
let layers_visited = reconstruct_state.get_layers_visited();
|
||||
|
||||
for (key, res) in std::mem::take(&mut reconstruct_state.keys) {
|
||||
@@ -1361,10 +1296,7 @@ impl Timeline {
|
||||
/// Not subject to [`Self::timeline_get_throttle`].
|
||||
pub(super) async fn validate_get_vectored_impl(
|
||||
&self,
|
||||
vectored_res: &Result<
|
||||
BTreeMap<Key, Result<(Bytes, ValueReconstructState), PageReconstructError>>,
|
||||
GetVectoredError,
|
||||
>,
|
||||
vectored_res: &Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError>,
|
||||
keyspace: KeySpace,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
@@ -1437,8 +1369,8 @@ impl Timeline {
|
||||
key: &Key,
|
||||
keyspace: &KeySpace,
|
||||
lsn: Lsn,
|
||||
(seq, seq_reconstruct_state): &(Bytes, ValueReconstructState),
|
||||
(vec, vec_reconstruct_state): &(Bytes, ValueReconstructState),
|
||||
seq: &Bytes,
|
||||
vec: &Bytes,
|
||||
) {
|
||||
if *key == AUX_FILES_KEY {
|
||||
// The value reconstruct of AUX_FILES_KEY from records is not deterministic
|
||||
@@ -1461,16 +1393,10 @@ impl Timeline {
|
||||
}
|
||||
} else {
|
||||
// All other keys should reconstruct deterministically, so we simply compare the blobs.
|
||||
if seq != vec {
|
||||
assert_eq!(
|
||||
seq_reconstruct_state, vec_reconstruct_state,
|
||||
"Reconstruct state mismatch for key {key} - keyspace={keyspace:?} lsn={lsn}"
|
||||
);
|
||||
assert_eq!(
|
||||
seq, vec,
|
||||
"Image mismatch for key {key} - keyspace={keyspace:?} lsn={lsn}"
|
||||
);
|
||||
}
|
||||
assert_eq!(
|
||||
seq, vec,
|
||||
"Image mismatch for key {key} - keyspace={keyspace:?} lsn={lsn}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2239,15 +2165,6 @@ impl Timeline {
|
||||
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
fn get_test_vm_bit_debug_logging(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.test_vm_bit_debug_logging
|
||||
.unwrap_or(self.conf.default_tenant_conf.test_vm_bit_debug_logging)
|
||||
}
|
||||
|
||||
fn get_image_layer_creation_check_threshold(&self) -> u8 {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
@@ -2263,10 +2180,6 @@ impl Timeline {
|
||||
pub(super) fn tenant_conf_updated(&self, new_conf: &TenantConfOpt) {
|
||||
// NB: Most tenant conf options are read by background loops, so,
|
||||
// changes will automatically be picked up.
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
info!(?new_conf.test_vm_bit_debug_logging, "updating tenant conf");
|
||||
}
|
||||
|
||||
// The threshold is embedded in the metric. So, we need to update it.
|
||||
{
|
||||
@@ -4442,7 +4355,7 @@ impl Timeline {
|
||||
let mut total_kb_retrieved = 0;
|
||||
let mut total_keys_retrieved = 0;
|
||||
for (k, v) in data {
|
||||
let (v, _) = v.map_err(CreateImageLayersError::PageReconstructError)?;
|
||||
let v = v.map_err(CreateImageLayersError::PageReconstructError)?;
|
||||
total_kb_retrieved += KEY_SIZE + v.len();
|
||||
total_keys_retrieved += 1;
|
||||
new_data.insert(k, v);
|
||||
@@ -4909,7 +4822,7 @@ impl Timeline {
|
||||
pitr: Duration,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<GcCutoffs> {
|
||||
) -> Result<GcCutoffs, PageReconstructError> {
|
||||
let _timer = self
|
||||
.metrics
|
||||
.find_gc_cutoffs_histo
|
||||
@@ -5237,7 +5150,7 @@ impl Timeline {
|
||||
key: Key,
|
||||
request_lsn: Lsn,
|
||||
mut data: ValueReconstructState,
|
||||
) -> Result<(Bytes, ValueReconstructState), PageReconstructError> {
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
// Perform WAL redo if needed
|
||||
data.records.reverse();
|
||||
|
||||
@@ -5250,7 +5163,7 @@ impl Timeline {
|
||||
img_lsn,
|
||||
request_lsn,
|
||||
);
|
||||
Ok((img.clone(), data))
|
||||
Ok(img.clone())
|
||||
} else {
|
||||
Err(PageReconstructError::from(anyhow!(
|
||||
"base image for {key} at {request_lsn} not found"
|
||||
@@ -5282,8 +5195,6 @@ impl Timeline {
|
||||
|
||||
let last_rec_lsn = data.records.last().unwrap().0;
|
||||
|
||||
let ret_state = data.clone();
|
||||
|
||||
let img = match self
|
||||
.walredo_mgr
|
||||
.as_ref()
|
||||
@@ -5314,7 +5225,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
Ok((img, ret_state))
|
||||
Ok(img)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1064,7 +1064,7 @@ impl Timeline {
|
||||
img: base_image,
|
||||
records: delta_above_base_image,
|
||||
};
|
||||
let (img, _) = tline.reconstruct_value(key, horizon, state).await?;
|
||||
let img = tline.reconstruct_value(key, horizon, state).await?;
|
||||
Ok((keys_above_horizon, img))
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,6 @@ use utils::{crashsafe, fs_ext, id::TimelineId, pausable_failpoint};
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
deletion_queue::DeletionQueueClient,
|
||||
task_mgr::{self, TaskKind},
|
||||
tenant::{
|
||||
metadata::TimelineMetadata,
|
||||
@@ -263,7 +262,6 @@ impl DeleteTimelineFlow {
|
||||
timeline_id: TimelineId,
|
||||
local_metadata: &TimelineMetadata,
|
||||
remote_client: RemoteTimelineClient,
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
) -> anyhow::Result<()> {
|
||||
// Note: here we even skip populating layer map. Timeline is essentially uninitialized.
|
||||
// RemoteTimelineClient is the only functioning part.
|
||||
@@ -274,7 +272,6 @@ impl DeleteTimelineFlow {
|
||||
None, // Ancestor is not needed for deletion.
|
||||
TimelineResources {
|
||||
remote_client,
|
||||
deletion_queue_client,
|
||||
timeline_get_throttle: tenant.timeline_get_throttle.clone(),
|
||||
},
|
||||
// Important. We dont pass ancestor above because it can be missing.
|
||||
|
||||
@@ -49,6 +49,19 @@ pub enum NeonWalRecord {
|
||||
file_path: String,
|
||||
content: Option<Bytes>,
|
||||
},
|
||||
|
||||
/// A testing record for unit testing purposes. It supports append data to an existing image, or clear it.
|
||||
#[cfg(test)]
|
||||
Test {
|
||||
/// Append a string to the image.
|
||||
append: String,
|
||||
/// Clear the image before appending.
|
||||
clear: bool,
|
||||
/// Treat this record as an init record. `clear` should be set to true if this field is set
|
||||
/// to true. This record does not need the history WALs to reconstruct. See [`NeonWalRecord::will_init`] and
|
||||
/// its references in `timeline.rs`.
|
||||
will_init: bool,
|
||||
},
|
||||
}
|
||||
|
||||
impl NeonWalRecord {
|
||||
@@ -58,11 +71,39 @@ impl NeonWalRecord {
|
||||
// If you change this function, you'll also need to change ValueBytes::will_init
|
||||
match self {
|
||||
NeonWalRecord::Postgres { will_init, rec: _ } => *will_init,
|
||||
|
||||
#[cfg(test)]
|
||||
NeonWalRecord::Test { will_init, .. } => *will_init,
|
||||
// None of the special neon record types currently initialize the page
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn wal_append(s: impl AsRef<str>) -> Self {
|
||||
Self::Test {
|
||||
append: s.as_ref().to_string(),
|
||||
clear: false,
|
||||
will_init: false,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn wal_clear() -> Self {
|
||||
Self::Test {
|
||||
append: "".to_string(),
|
||||
clear: true,
|
||||
will_init: false,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn wal_init() -> Self {
|
||||
Self::Test {
|
||||
append: "".to_string(),
|
||||
clear: true,
|
||||
will_init: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// DecodedBkpBlock represents per-page data contained in a WAL record.
|
||||
|
||||
@@ -244,6 +244,20 @@ pub(crate) fn apply_in_neon(
|
||||
let mut writer = page.writer();
|
||||
dir.ser_into(&mut writer)?;
|
||||
}
|
||||
#[cfg(test)]
|
||||
NeonWalRecord::Test {
|
||||
append,
|
||||
clear,
|
||||
will_init,
|
||||
} => {
|
||||
if *will_init {
|
||||
assert!(*clear, "init record must be clear to ensure correctness");
|
||||
}
|
||||
if *clear {
|
||||
page.clear();
|
||||
}
|
||||
page.put_slice(append.as_bytes());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,19 +1,8 @@
|
||||
From 0b0194a57bd0f3598bd57dbedd0df3932330169d Mon Sep 17 00:00:00 2001
|
||||
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
|
||||
Date: Fri, 2 Feb 2024 22:26:45 +0200
|
||||
Subject: [PATCH 1/1] Make v0.6.0 work with Neon
|
||||
|
||||
Now that the WAL-logging happens as a separate step at the end of the
|
||||
build, we need a few neon-specific hints to make it work.
|
||||
---
|
||||
src/hnswbuild.c | 36 ++++++++++++++++++++++++++++++++++++
|
||||
1 file changed, 36 insertions(+)
|
||||
|
||||
diff --git a/src/hnswbuild.c b/src/hnswbuild.c
|
||||
index 680789b..ec54dea 100644
|
||||
index dcfb2bd..d5189ee 100644
|
||||
--- a/src/hnswbuild.c
|
||||
+++ b/src/hnswbuild.c
|
||||
@@ -840,9 +840,17 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc)
|
||||
@@ -860,9 +860,17 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc)
|
||||
|
||||
hnswarea = shm_toc_lookup(toc, PARALLEL_KEY_HNSW_AREA, false);
|
||||
|
||||
@@ -31,7 +20,7 @@ index 680789b..ec54dea 100644
|
||||
/* Close relations within worker */
|
||||
index_close(indexRel, indexLockmode);
|
||||
table_close(heapRel, heapLockmode);
|
||||
@@ -1089,13 +1097,41 @@ BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo,
|
||||
@@ -1117,12 +1125,38 @@ BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo,
|
||||
SeedRandom(42);
|
||||
#endif
|
||||
|
||||
@@ -43,14 +32,13 @@ index 680789b..ec54dea 100644
|
||||
|
||||
BuildGraph(buildstate, forkNum);
|
||||
|
||||
- if (RelationNeedsWAL(index) || forkNum == INIT_FORKNUM)
|
||||
+#ifdef NEON_SMGR
|
||||
+ smgr_finish_unlogged_build_phase_1(RelationGetSmgr(index));
|
||||
+#endif
|
||||
+
|
||||
if (RelationNeedsWAL(index))
|
||||
+ {
|
||||
log_newpage_range(index, forkNum, 0, RelationGetNumberOfBlocks(index), true);
|
||||
|
||||
+ if (RelationNeedsWAL(index) || forkNum == INIT_FORKNUM) {
|
||||
log_newpage_range(index, forkNum, 0, RelationGetNumberOfBlocksInFork(index, forkNum), true);
|
||||
+#ifdef NEON_SMGR
|
||||
+ {
|
||||
+#if PG_VERSION_NUM >= 160000
|
||||
@@ -60,7 +48,7 @@ index 680789b..ec54dea 100644
|
||||
+#endif
|
||||
+
|
||||
+ SetLastWrittenLSNForBlockRange(XactLastRecEnd, rlocator,
|
||||
+ MAIN_FORKNUM, 0, RelationGetNumberOfBlocks(index));
|
||||
+ MAIN_FORKNUM, 0, RelationGetNumberOfBlocks(index));
|
||||
+ SetLastWrittenLSNForRelation(XactLastRecEnd, rlocator, MAIN_FORKNUM);
|
||||
+ }
|
||||
+#endif
|
||||
@@ -69,10 +57,6 @@ index 680789b..ec54dea 100644
|
||||
+#ifdef NEON_SMGR
|
||||
+ smgr_end_unlogged_build(RelationGetSmgr(index));
|
||||
+#endif
|
||||
+
|
||||
|
||||
FreeBuildState(buildstate);
|
||||
}
|
||||
|
||||
--
|
||||
2.39.2
|
||||
|
||||
|
||||
@@ -3112,12 +3112,12 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
|
||||
request_lsn = UINT64_MAX;
|
||||
|
||||
/*
|
||||
* GetRedoStartLsn() returns LSN of basebackup. We know that the SLRU
|
||||
* GetRedoStartLsn() returns LSN of the basebackup. We know that the SLRU
|
||||
* segment has not changed since the basebackup, because in order to
|
||||
* modify it, we would have had to download it already. And once
|
||||
* downloaded, we never evict SLRU segments from local disk.
|
||||
*/
|
||||
not_modified_since = GetRedoStartLsn();
|
||||
not_modified_since = nm_adjust_lsn(GetRedoStartLsn());
|
||||
|
||||
SlruKind kind;
|
||||
|
||||
|
||||
@@ -1,16 +1,183 @@
|
||||
use measured::FixedCardinalityLabel;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt;
|
||||
use std::fmt::{self, Display};
|
||||
|
||||
use crate::auth::IpPattern;
|
||||
|
||||
use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt};
|
||||
use crate::proxy::retry::ShouldRetry;
|
||||
|
||||
/// Generic error response with human-readable description.
|
||||
/// Note that we can't always present it to user as is.
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct ConsoleError {
|
||||
pub error: Box<str>,
|
||||
#[serde(skip)]
|
||||
pub http_status_code: http::StatusCode,
|
||||
pub status: Option<Status>,
|
||||
}
|
||||
|
||||
impl ConsoleError {
|
||||
pub fn get_reason(&self) -> Reason {
|
||||
self.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.details.error_info.as_ref())
|
||||
.map(|e| e.reason)
|
||||
.unwrap_or(Reason::Unknown)
|
||||
}
|
||||
pub fn get_user_facing_message(&self) -> String {
|
||||
use super::provider::errors::REQUEST_FAILED;
|
||||
self.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.details.user_facing_message.as_ref())
|
||||
.map(|m| m.message.clone().into())
|
||||
.unwrap_or_else(|| {
|
||||
// Ask @neondatabase/control-plane for review before adding more.
|
||||
match self.http_status_code {
|
||||
http::StatusCode::NOT_FOUND => {
|
||||
// Status 404: failed to get a project-related resource.
|
||||
format!("{REQUEST_FAILED}: endpoint cannot be found")
|
||||
}
|
||||
http::StatusCode::NOT_ACCEPTABLE => {
|
||||
// Status 406: endpoint is disabled (we don't allow connections).
|
||||
format!("{REQUEST_FAILED}: endpoint is disabled")
|
||||
}
|
||||
http::StatusCode::LOCKED | http::StatusCode::UNPROCESSABLE_ENTITY => {
|
||||
// Status 423: project might be in maintenance mode (or bad state), or quotas exceeded.
|
||||
format!("{REQUEST_FAILED}: endpoint is temporarily unavailable. Check your quotas and/or contact our support.")
|
||||
}
|
||||
_ => REQUEST_FAILED.to_owned(),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for ConsoleError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let msg = self
|
||||
.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.details.user_facing_message.as_ref())
|
||||
.map(|m| m.message.as_ref())
|
||||
.unwrap_or_else(|| &self.error);
|
||||
write!(f, "{}", msg)
|
||||
}
|
||||
}
|
||||
|
||||
impl ShouldRetry for ConsoleError {
|
||||
fn could_retry(&self) -> bool {
|
||||
if self.status.is_none() || self.status.as_ref().unwrap().details.retry_info.is_none() {
|
||||
// retry some temporary failures because the compute was in a bad state
|
||||
// (bad request can be returned when the endpoint was in transition)
|
||||
return match &self {
|
||||
ConsoleError {
|
||||
http_status_code: http::StatusCode::BAD_REQUEST,
|
||||
..
|
||||
} => true,
|
||||
// don't retry when quotas are exceeded
|
||||
ConsoleError {
|
||||
http_status_code: http::StatusCode::UNPROCESSABLE_ENTITY,
|
||||
ref error,
|
||||
..
|
||||
} => !error.contains("compute time quota of non-primary branches is exceeded"),
|
||||
// locked can be returned when the endpoint was in transition
|
||||
// or when quotas are exceeded. don't retry when quotas are exceeded
|
||||
ConsoleError {
|
||||
http_status_code: http::StatusCode::LOCKED,
|
||||
ref error,
|
||||
..
|
||||
} => {
|
||||
!error.contains("quota exceeded")
|
||||
&& !error.contains("the limit for current plan reached")
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
}
|
||||
|
||||
// retry if the response has a retry delay
|
||||
if let Some(retry_info) = self
|
||||
.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.details.retry_info.as_ref())
|
||||
{
|
||||
retry_info.retry_delay_ms > 0
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct Status {
|
||||
pub code: Box<str>,
|
||||
pub message: Box<str>,
|
||||
pub details: Details,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct Details {
|
||||
pub error_info: Option<ErrorInfo>,
|
||||
pub retry_info: Option<RetryInfo>,
|
||||
pub user_facing_message: Option<UserFacingMessage>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct ErrorInfo {
|
||||
pub reason: Reason,
|
||||
// Schema could also have `metadata` field, but it's not structured. Skip it for now.
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Deserialize, Default)]
|
||||
pub enum Reason {
|
||||
#[serde(rename = "ROLE_PROTECTED")]
|
||||
RoleProtected,
|
||||
#[serde(rename = "RESOURCE_NOT_FOUND")]
|
||||
ResourceNotFound,
|
||||
#[serde(rename = "PROJECT_NOT_FOUND")]
|
||||
ProjectNotFound,
|
||||
#[serde(rename = "ENDPOINT_NOT_FOUND")]
|
||||
EndpointNotFound,
|
||||
#[serde(rename = "BRANCH_NOT_FOUND")]
|
||||
BranchNotFound,
|
||||
#[serde(rename = "RATE_LIMIT_EXCEEDED")]
|
||||
RateLimitExceeded,
|
||||
#[serde(rename = "NON_PRIMARY_BRANCH_COMPUTE_TIME_EXCEEDED")]
|
||||
NonPrimaryBranchComputeTimeExceeded,
|
||||
#[serde(rename = "ACTIVE_TIME_QUOTA_EXCEEDED")]
|
||||
ActiveTimeQuotaExceeded,
|
||||
#[serde(rename = "COMPUTE_TIME_QUOTA_EXCEEDED")]
|
||||
ComputeTimeQuotaExceeded,
|
||||
#[serde(rename = "WRITTEN_DATA_QUOTA_EXCEEDED")]
|
||||
WrittenDataQuotaExceeded,
|
||||
#[serde(rename = "DATA_TRANSFER_QUOTA_EXCEEDED")]
|
||||
DataTransferQuotaExceeded,
|
||||
#[serde(rename = "LOGICAL_SIZE_QUOTA_EXCEEDED")]
|
||||
LogicalSizeQuotaExceeded,
|
||||
#[default]
|
||||
#[serde(other)]
|
||||
Unknown,
|
||||
}
|
||||
|
||||
impl Reason {
|
||||
pub fn is_not_found(&self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
Reason::ResourceNotFound
|
||||
| Reason::ProjectNotFound
|
||||
| Reason::EndpointNotFound
|
||||
| Reason::BranchNotFound
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct RetryInfo {
|
||||
pub retry_delay_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct UserFacingMessage {
|
||||
pub message: Box<str>,
|
||||
}
|
||||
|
||||
/// Response which holds client's auth secret, e.g. [`crate::scram::ServerSecret`].
|
||||
|
||||
@@ -25,8 +25,8 @@ use tracing::info;
|
||||
|
||||
pub mod errors {
|
||||
use crate::{
|
||||
console::messages::{self, ConsoleError},
|
||||
error::{io_error, ReportableError, UserFacingError},
|
||||
http,
|
||||
proxy::retry::ShouldRetry,
|
||||
};
|
||||
use thiserror::Error;
|
||||
@@ -34,17 +34,14 @@ pub mod errors {
|
||||
use super::ApiLockError;
|
||||
|
||||
/// A go-to error message which doesn't leak any detail.
|
||||
const REQUEST_FAILED: &str = "Console request failed";
|
||||
pub const REQUEST_FAILED: &str = "Console request failed";
|
||||
|
||||
/// Common console API error.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ApiError {
|
||||
/// Error returned by the console itself.
|
||||
#[error("{REQUEST_FAILED} with {}: {}", .status, .text)]
|
||||
Console {
|
||||
status: http::StatusCode,
|
||||
text: Box<str>,
|
||||
},
|
||||
#[error("{REQUEST_FAILED} with {0}")]
|
||||
Console(ConsoleError),
|
||||
|
||||
/// Various IO errors like broken pipe or malformed payload.
|
||||
#[error("{REQUEST_FAILED}: {0}")]
|
||||
@@ -53,11 +50,11 @@ pub mod errors {
|
||||
|
||||
impl ApiError {
|
||||
/// Returns HTTP status code if it's the reason for failure.
|
||||
pub fn http_status_code(&self) -> Option<http::StatusCode> {
|
||||
pub fn get_reason(&self) -> messages::Reason {
|
||||
use ApiError::*;
|
||||
match self {
|
||||
Console { status, .. } => Some(*status),
|
||||
_ => None,
|
||||
Console(e) => e.get_reason(),
|
||||
_ => messages::Reason::Unknown,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -67,22 +64,7 @@ pub mod errors {
|
||||
use ApiError::*;
|
||||
match self {
|
||||
// To minimize risks, only select errors are forwarded to users.
|
||||
// Ask @neondatabase/control-plane for review before adding more.
|
||||
Console { status, .. } => match *status {
|
||||
http::StatusCode::NOT_FOUND => {
|
||||
// Status 404: failed to get a project-related resource.
|
||||
format!("{REQUEST_FAILED}: endpoint cannot be found")
|
||||
}
|
||||
http::StatusCode::NOT_ACCEPTABLE => {
|
||||
// Status 406: endpoint is disabled (we don't allow connections).
|
||||
format!("{REQUEST_FAILED}: endpoint is disabled")
|
||||
}
|
||||
http::StatusCode::LOCKED | http::StatusCode::UNPROCESSABLE_ENTITY => {
|
||||
// Status 423: project might be in maintenance mode (or bad state), or quotas exceeded.
|
||||
format!("{REQUEST_FAILED}: endpoint is temporarily unavailable. Check your quotas and/or contact our support.")
|
||||
}
|
||||
_ => REQUEST_FAILED.to_owned(),
|
||||
},
|
||||
Console(c) => c.get_user_facing_message(),
|
||||
_ => REQUEST_FAILED.to_owned(),
|
||||
}
|
||||
}
|
||||
@@ -91,29 +73,56 @@ pub mod errors {
|
||||
impl ReportableError for ApiError {
|
||||
fn get_error_kind(&self) -> crate::error::ErrorKind {
|
||||
match self {
|
||||
ApiError::Console {
|
||||
status: http::StatusCode::NOT_FOUND | http::StatusCode::NOT_ACCEPTABLE,
|
||||
..
|
||||
} => crate::error::ErrorKind::User,
|
||||
ApiError::Console {
|
||||
status: http::StatusCode::UNPROCESSABLE_ENTITY,
|
||||
text,
|
||||
} if text.contains("compute time quota of non-primary branches is exceeded") => {
|
||||
crate::error::ErrorKind::User
|
||||
ApiError::Console(e) => {
|
||||
use crate::error::ErrorKind::*;
|
||||
match e.get_reason() {
|
||||
crate::console::messages::Reason::RoleProtected => User,
|
||||
crate::console::messages::Reason::ResourceNotFound => User,
|
||||
crate::console::messages::Reason::ProjectNotFound => User,
|
||||
crate::console::messages::Reason::EndpointNotFound => User,
|
||||
crate::console::messages::Reason::BranchNotFound => User,
|
||||
crate::console::messages::Reason::RateLimitExceeded => ServiceRateLimit,
|
||||
crate::console::messages::Reason::NonPrimaryBranchComputeTimeExceeded => {
|
||||
User
|
||||
}
|
||||
crate::console::messages::Reason::ActiveTimeQuotaExceeded => User,
|
||||
crate::console::messages::Reason::ComputeTimeQuotaExceeded => User,
|
||||
crate::console::messages::Reason::WrittenDataQuotaExceeded => User,
|
||||
crate::console::messages::Reason::DataTransferQuotaExceeded => User,
|
||||
crate::console::messages::Reason::LogicalSizeQuotaExceeded => User,
|
||||
crate::console::messages::Reason::Unknown => match &e {
|
||||
ConsoleError {
|
||||
http_status_code:
|
||||
http::StatusCode::NOT_FOUND | http::StatusCode::NOT_ACCEPTABLE,
|
||||
..
|
||||
} => crate::error::ErrorKind::User,
|
||||
ConsoleError {
|
||||
http_status_code: http::StatusCode::UNPROCESSABLE_ENTITY,
|
||||
error,
|
||||
..
|
||||
} if error.contains(
|
||||
"compute time quota of non-primary branches is exceeded",
|
||||
) =>
|
||||
{
|
||||
crate::error::ErrorKind::User
|
||||
}
|
||||
ConsoleError {
|
||||
http_status_code: http::StatusCode::LOCKED,
|
||||
error,
|
||||
..
|
||||
} if error.contains("quota exceeded")
|
||||
|| error.contains("the limit for current plan reached") =>
|
||||
{
|
||||
crate::error::ErrorKind::User
|
||||
}
|
||||
ConsoleError {
|
||||
http_status_code: http::StatusCode::TOO_MANY_REQUESTS,
|
||||
..
|
||||
} => crate::error::ErrorKind::ServiceRateLimit,
|
||||
ConsoleError { .. } => crate::error::ErrorKind::ControlPlane,
|
||||
},
|
||||
}
|
||||
}
|
||||
ApiError::Console {
|
||||
status: http::StatusCode::LOCKED,
|
||||
text,
|
||||
} if text.contains("quota exceeded")
|
||||
|| text.contains("the limit for current plan reached") =>
|
||||
{
|
||||
crate::error::ErrorKind::User
|
||||
}
|
||||
ApiError::Console {
|
||||
status: http::StatusCode::TOO_MANY_REQUESTS,
|
||||
..
|
||||
} => crate::error::ErrorKind::ServiceRateLimit,
|
||||
ApiError::Console { .. } => crate::error::ErrorKind::ControlPlane,
|
||||
ApiError::Transport(_) => crate::error::ErrorKind::ControlPlane,
|
||||
}
|
||||
}
|
||||
@@ -124,31 +133,7 @@ pub mod errors {
|
||||
match self {
|
||||
// retry some transport errors
|
||||
Self::Transport(io) => io.could_retry(),
|
||||
// retry some temporary failures because the compute was in a bad state
|
||||
// (bad request can be returned when the endpoint was in transition)
|
||||
Self::Console {
|
||||
status: http::StatusCode::BAD_REQUEST,
|
||||
..
|
||||
} => true,
|
||||
// don't retry when quotas are exceeded
|
||||
Self::Console {
|
||||
status: http::StatusCode::UNPROCESSABLE_ENTITY,
|
||||
ref text,
|
||||
} => !text.contains("compute time quota of non-primary branches is exceeded"),
|
||||
// locked can be returned when the endpoint was in transition
|
||||
// or when quotas are exceeded. don't retry when quotas are exceeded
|
||||
Self::Console {
|
||||
status: http::StatusCode::LOCKED,
|
||||
ref text,
|
||||
} => {
|
||||
// written data quota exceeded
|
||||
// data transfer quota exceeded
|
||||
// compute time quota exceeded
|
||||
// logical size quota exceeded
|
||||
!text.contains("quota exceeded")
|
||||
&& !text.contains("the limit for current plan reached")
|
||||
}
|
||||
_ => false,
|
||||
Self::Console(e) => e.could_retry(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -509,7 +494,7 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
|
||||
self.metrics
|
||||
.semaphore_acquire_seconds
|
||||
.observe(now.elapsed().as_secs_f64());
|
||||
|
||||
info!("acquired permit {:?}", now.elapsed().as_secs_f64());
|
||||
Ok(WakeComputePermit { permit: permit? })
|
||||
}
|
||||
|
||||
|
||||
@@ -94,12 +94,14 @@ impl Api {
|
||||
let body = match parse_body::<GetRoleSecret>(response).await {
|
||||
Ok(body) => body,
|
||||
// Error 404 is special: it's ok not to have a secret.
|
||||
Err(e) => match e.http_status_code() {
|
||||
Some(http::StatusCode::NOT_FOUND) => {
|
||||
// TODO(anna): retry
|
||||
Err(e) => {
|
||||
if e.get_reason().is_not_found() {
|
||||
return Ok(AuthInfo::default());
|
||||
} else {
|
||||
return Err(e.into());
|
||||
}
|
||||
_otherwise => return Err(e.into()),
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
let secret = if body.role_secret.is_empty() {
|
||||
@@ -328,19 +330,24 @@ async fn parse_body<T: for<'a> serde::Deserialize<'a>>(
|
||||
info!("request succeeded, processing the body");
|
||||
return Ok(response.json().await?);
|
||||
}
|
||||
let s = response.bytes().await?;
|
||||
// Log plaintext to be able to detect, whether there are some cases not covered by the error struct.
|
||||
info!("response_error plaintext: {:?}", s);
|
||||
|
||||
// Don't throw an error here because it's not as important
|
||||
// as the fact that the request itself has failed.
|
||||
let body = response.json().await.unwrap_or_else(|e| {
|
||||
let mut body = serde_json::from_slice(&s).unwrap_or_else(|e| {
|
||||
warn!("failed to parse error body: {e}");
|
||||
ConsoleError {
|
||||
error: "reason unclear (malformed error message)".into(),
|
||||
http_status_code: status,
|
||||
status: None,
|
||||
}
|
||||
});
|
||||
body.http_status_code = status;
|
||||
|
||||
let text = body.error;
|
||||
error!("console responded with an error ({status}): {text}");
|
||||
Err(ApiError::Console { status, text })
|
||||
error!("console responded with an error ({status}): {body:?}");
|
||||
Err(ApiError::Console(body))
|
||||
}
|
||||
|
||||
fn parse_host_port(input: &str) -> Option<(&str, u16)> {
|
||||
|
||||
@@ -12,7 +12,7 @@ use crate::auth::backend::{
|
||||
};
|
||||
use crate::config::{CertResolver, RetryConfig};
|
||||
use crate::console::caches::NodeInfoCache;
|
||||
use crate::console::messages::MetricsAuxInfo;
|
||||
use crate::console::messages::{ConsoleError, MetricsAuxInfo};
|
||||
use crate::console::provider::{CachedAllowedIps, CachedRoleSecret, ConsoleBackend};
|
||||
use crate::console::{self, CachedNodeInfo, NodeInfo};
|
||||
use crate::error::ErrorKind;
|
||||
@@ -484,18 +484,20 @@ impl TestBackend for TestConnectMechanism {
|
||||
match action {
|
||||
ConnectAction::Wake => Ok(helper_create_cached_node_info(self.cache)),
|
||||
ConnectAction::WakeFail => {
|
||||
let err = console::errors::ApiError::Console {
|
||||
status: http::StatusCode::FORBIDDEN,
|
||||
text: "TEST".into(),
|
||||
};
|
||||
let err = console::errors::ApiError::Console(ConsoleError {
|
||||
http_status_code: http::StatusCode::FORBIDDEN,
|
||||
error: "TEST".into(),
|
||||
status: None,
|
||||
});
|
||||
assert!(!err.could_retry());
|
||||
Err(console::errors::WakeComputeError::ApiError(err))
|
||||
}
|
||||
ConnectAction::WakeRetry => {
|
||||
let err = console::errors::ApiError::Console {
|
||||
status: http::StatusCode::BAD_REQUEST,
|
||||
text: "TEST".into(),
|
||||
};
|
||||
let err = console::errors::ApiError::Console(ConsoleError {
|
||||
http_status_code: http::StatusCode::BAD_REQUEST,
|
||||
error: "TEST".into(),
|
||||
status: None,
|
||||
});
|
||||
assert!(err.could_retry());
|
||||
Err(console::errors::WakeComputeError::ApiError(err))
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::config::RetryConfig;
|
||||
use crate::console::messages::ConsoleError;
|
||||
use crate::console::{errors::WakeComputeError, provider::CachedNodeInfo};
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::metrics::{
|
||||
@@ -88,36 +89,76 @@ fn report_error(e: &WakeComputeError, retry: bool) {
|
||||
let kind = match e {
|
||||
WakeComputeError::BadComputeAddress(_) => WakeupFailureKind::BadComputeAddress,
|
||||
WakeComputeError::ApiError(ApiError::Transport(_)) => WakeupFailureKind::ApiTransportError,
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::LOCKED,
|
||||
ref text,
|
||||
}) if text.contains("written data quota exceeded")
|
||||
|| text.contains("the limit for current plan reached") =>
|
||||
{
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::UNPROCESSABLE_ENTITY,
|
||||
ref text,
|
||||
}) if text.contains("compute time quota of non-primary branches is exceeded") => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::LOCKED,
|
||||
..
|
||||
}) => WakeupFailureKind::ApiConsoleLocked,
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::BAD_REQUEST,
|
||||
..
|
||||
}) => WakeupFailureKind::ApiConsoleBadRequest,
|
||||
WakeComputeError::ApiError(ApiError::Console { status, .. })
|
||||
if status.is_server_error() =>
|
||||
{
|
||||
WakeupFailureKind::ApiConsoleOtherServerError
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console { .. }) => {
|
||||
WakeupFailureKind::ApiConsoleOtherError
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console(e)) => match e.get_reason() {
|
||||
crate::console::messages::Reason::RoleProtected => {
|
||||
WakeupFailureKind::ApiConsoleBadRequest
|
||||
}
|
||||
crate::console::messages::Reason::ResourceNotFound => {
|
||||
WakeupFailureKind::ApiConsoleBadRequest
|
||||
}
|
||||
crate::console::messages::Reason::ProjectNotFound => {
|
||||
WakeupFailureKind::ApiConsoleBadRequest
|
||||
}
|
||||
crate::console::messages::Reason::EndpointNotFound => {
|
||||
WakeupFailureKind::ApiConsoleBadRequest
|
||||
}
|
||||
crate::console::messages::Reason::BranchNotFound => {
|
||||
WakeupFailureKind::ApiConsoleBadRequest
|
||||
}
|
||||
crate::console::messages::Reason::RateLimitExceeded => {
|
||||
WakeupFailureKind::ApiConsoleLocked
|
||||
}
|
||||
crate::console::messages::Reason::NonPrimaryBranchComputeTimeExceeded => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
crate::console::messages::Reason::ActiveTimeQuotaExceeded => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
crate::console::messages::Reason::ComputeTimeQuotaExceeded => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
crate::console::messages::Reason::WrittenDataQuotaExceeded => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
crate::console::messages::Reason::DataTransferQuotaExceeded => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
crate::console::messages::Reason::LogicalSizeQuotaExceeded => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
crate::console::messages::Reason::Unknown => match e {
|
||||
ConsoleError {
|
||||
http_status_code: StatusCode::LOCKED,
|
||||
ref error,
|
||||
..
|
||||
} if error.contains("written data quota exceeded")
|
||||
|| error.contains("the limit for current plan reached") =>
|
||||
{
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
ConsoleError {
|
||||
http_status_code: StatusCode::UNPROCESSABLE_ENTITY,
|
||||
ref error,
|
||||
..
|
||||
} if error.contains("compute time quota of non-primary branches is exceeded") => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
ConsoleError {
|
||||
http_status_code: StatusCode::LOCKED,
|
||||
..
|
||||
} => WakeupFailureKind::ApiConsoleLocked,
|
||||
ConsoleError {
|
||||
http_status_code: StatusCode::BAD_REQUEST,
|
||||
..
|
||||
} => WakeupFailureKind::ApiConsoleBadRequest,
|
||||
ConsoleError {
|
||||
http_status_code, ..
|
||||
} if http_status_code.is_server_error() => {
|
||||
WakeupFailureKind::ApiConsoleOtherServerError
|
||||
}
|
||||
ConsoleError { .. } => WakeupFailureKind::ApiConsoleOtherError,
|
||||
},
|
||||
},
|
||||
WakeComputeError::TooManyConnections => WakeupFailureKind::ApiConsoleLocked,
|
||||
WakeComputeError::TooManyConnectionAttempts(_) => WakeupFailureKind::TimeoutError,
|
||||
};
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
use std::usize;
|
||||
|
||||
use super::{LimitAlgorithm, Outcome, Sample};
|
||||
|
||||
/// Loss-based congestion avoidance.
|
||||
|
||||
@@ -32,8 +32,6 @@ pub struct ClientFirstMessage<'a> {
|
||||
pub bare: &'a str,
|
||||
/// Channel binding mode.
|
||||
pub cbind_flag: ChannelBinding<&'a str>,
|
||||
/// (Client username)[<https://github.com/postgres/postgres/blob/94226d4506e66d6e7cbf/src/backend/libpq/auth-scram.c#L13>].
|
||||
pub username: &'a str,
|
||||
/// Client nonce.
|
||||
pub nonce: &'a str,
|
||||
}
|
||||
@@ -58,6 +56,14 @@ impl<'a> ClientFirstMessage<'a> {
|
||||
|
||||
// In theory, these might be preceded by "reserved-mext" (i.e. "m=")
|
||||
let username = parts.next()?.strip_prefix("n=")?;
|
||||
|
||||
// https://github.com/postgres/postgres/blob/f83908798f78c4cafda217ca875602c88ea2ae28/src/backend/libpq/auth-scram.c#L13-L14
|
||||
if !username.is_empty() {
|
||||
tracing::warn!(username, "scram username provided, but is not expected")
|
||||
// TODO(conrad):
|
||||
// return None;
|
||||
}
|
||||
|
||||
let nonce = parts.next()?.strip_prefix("r=")?;
|
||||
|
||||
// Validate but ignore auth extensions
|
||||
@@ -66,7 +72,6 @@ impl<'a> ClientFirstMessage<'a> {
|
||||
Some(Self {
|
||||
bare,
|
||||
cbind_flag,
|
||||
username,
|
||||
nonce,
|
||||
})
|
||||
}
|
||||
@@ -188,19 +193,18 @@ mod tests {
|
||||
|
||||
// (Almost) real strings captured during debug sessions
|
||||
let cases = [
|
||||
(NotSupportedClient, "n,,n=pepe,r=t8JwklwKecDLwSsA72rHmVju"),
|
||||
(NotSupportedServer, "y,,n=pepe,r=t8JwklwKecDLwSsA72rHmVju"),
|
||||
(NotSupportedClient, "n,,n=,r=t8JwklwKecDLwSsA72rHmVju"),
|
||||
(NotSupportedServer, "y,,n=,r=t8JwklwKecDLwSsA72rHmVju"),
|
||||
(
|
||||
Required("tls-server-end-point"),
|
||||
"p=tls-server-end-point,,n=pepe,r=t8JwklwKecDLwSsA72rHmVju",
|
||||
"p=tls-server-end-point,,n=,r=t8JwklwKecDLwSsA72rHmVju",
|
||||
),
|
||||
];
|
||||
|
||||
for (cb, input) in cases {
|
||||
let msg = ClientFirstMessage::parse(input).unwrap();
|
||||
|
||||
assert_eq!(msg.bare, "n=pepe,r=t8JwklwKecDLwSsA72rHmVju");
|
||||
assert_eq!(msg.username, "pepe");
|
||||
assert_eq!(msg.bare, "n=,r=t8JwklwKecDLwSsA72rHmVju");
|
||||
assert_eq!(msg.nonce, "t8JwklwKecDLwSsA72rHmVju");
|
||||
assert_eq!(msg.cbind_flag, cb);
|
||||
}
|
||||
@@ -208,14 +212,13 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn parse_client_first_message_with_invalid_gs2_authz() {
|
||||
assert!(ClientFirstMessage::parse("n,authzid,n=user,r=nonce").is_none())
|
||||
assert!(ClientFirstMessage::parse("n,authzid,n=,r=nonce").is_none())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_client_first_message_with_extra_params() {
|
||||
let msg = ClientFirstMessage::parse("n,,n=user,r=nonce,a=foo,b=bar,c=baz").unwrap();
|
||||
assert_eq!(msg.bare, "n=user,r=nonce,a=foo,b=bar,c=baz");
|
||||
assert_eq!(msg.username, "user");
|
||||
let msg = ClientFirstMessage::parse("n,,n=,r=nonce,a=foo,b=bar,c=baz").unwrap();
|
||||
assert_eq!(msg.bare, "n=,r=nonce,a=foo,b=bar,c=baz");
|
||||
assert_eq!(msg.nonce, "nonce");
|
||||
assert_eq!(msg.cbind_flag, ChannelBinding::NotSupportedClient);
|
||||
}
|
||||
@@ -223,9 +226,9 @@ mod tests {
|
||||
#[test]
|
||||
fn parse_client_first_message_with_extra_params_invalid() {
|
||||
// must be of the form `<ascii letter>=<...>`
|
||||
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,abc=foo").is_none());
|
||||
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,1=foo").is_none());
|
||||
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,a").is_none());
|
||||
assert!(ClientFirstMessage::parse("n,,n=,r=nonce,abc=foo").is_none());
|
||||
assert!(ClientFirstMessage::parse("n,,n=,r=nonce,1=foo").is_none());
|
||||
assert!(ClientFirstMessage::parse("n,,n=,r=nonce,a").is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[toolchain]
|
||||
channel = "1.78.0"
|
||||
channel = "1.79.0"
|
||||
profile = "default"
|
||||
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
|
||||
# https://rust-lang.github.io/rustup/concepts/profiles.html
|
||||
|
||||
63
scripts/plot-compression-report.py
Executable file
63
scripts/plot-compression-report.py
Executable file
@@ -0,0 +1,63 @@
|
||||
#!/usr/bin/env -S python3 -u
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
from pprint import pprint
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
parser = argparse.ArgumentParser(prog="compression-report")
|
||||
parser.add_argument("dir")
|
||||
args = parser.parse_args()
|
||||
|
||||
files = []
|
||||
for file_name in os.listdir(args.dir):
|
||||
if not file_name.endswith(".json"):
|
||||
continue
|
||||
file_path = os.path.join(args.dir, file_name)
|
||||
with open(file_path) as json_str:
|
||||
json_data = json.load(json_str)
|
||||
files.append((file_name, json_data))
|
||||
#pprint(files)
|
||||
|
||||
extra_zstd_lines = True
|
||||
dc = 2 # data column to use (1 for sizes, 2 for time)
|
||||
sort_by = "ZstdHigh"
|
||||
files.sort(key=lambda file_data: [x for x in file_data[1] if x[0] == sort_by][0][dc])
|
||||
|
||||
|
||||
x_axis = []
|
||||
data_baseline = []
|
||||
data_lz4 = []
|
||||
data_zstd = []
|
||||
data_zstd_low = []
|
||||
data_zstd_high = []
|
||||
|
||||
for idx, f in enumerate(files):
|
||||
file_data = f[1]
|
||||
#pprint(file_data)
|
||||
|
||||
x_axis.append(idx)
|
||||
data_baseline.append([x for x in file_data if x[0] is None][0][dc])
|
||||
data_lz4.append([x for x in file_data if x[0] == "LZ4"][0][dc])
|
||||
data_zstd.append([x for x in file_data if x[0] == "Zstd"][0][dc])
|
||||
if extra_zstd_lines:
|
||||
data_zstd_low.append([x for x in file_data if x[0] == "ZstdLow"][0][dc])
|
||||
data_zstd_high.append([x for x in file_data if x[0] == "ZstdHigh"][0][dc])
|
||||
|
||||
plt.plot(x_axis, data_baseline, "x", markeredgewidth=2, label="baseline")
|
||||
plt.plot(x_axis, data_lz4, "x", markeredgewidth=2, label="lz4")
|
||||
plt.plot(x_axis, data_zstd, "x", markeredgewidth=2, label="Zstd")
|
||||
if extra_zstd_lines:
|
||||
plt.plot(x_axis, data_zstd_low, "x", markeredgewidth=2, label="ZstdLow")
|
||||
plt.plot(x_axis, data_zstd_high, "x", markeredgewidth=2, label="ZstdHigh")
|
||||
|
||||
# plt.style.use('_mpl-gallery')
|
||||
plt.ylim(bottom=0)
|
||||
plt.legend(loc="upper left")
|
||||
|
||||
figure_path = os.path.join(args.dir, "figure.png")
|
||||
print(f"saving figure to {figure_path}")
|
||||
plt.savefig(figure_path)
|
||||
plt.show()
|
||||
@@ -31,6 +31,7 @@ pub(crate) enum PageserverState {
|
||||
Available {
|
||||
last_seen_at: Instant,
|
||||
utilization: PageserverUtilization,
|
||||
new: bool,
|
||||
},
|
||||
Offline,
|
||||
}
|
||||
@@ -127,6 +128,7 @@ impl HeartbeaterTask {
|
||||
heartbeat_futs.push({
|
||||
let jwt_token = self.jwt_token.clone();
|
||||
let cancel = self.cancel.clone();
|
||||
let new_node = !self.state.contains_key(node_id);
|
||||
|
||||
// Clone the node and mark it as available such that the request
|
||||
// goes through to the pageserver even when the node is marked offline.
|
||||
@@ -159,6 +161,7 @@ impl HeartbeaterTask {
|
||||
PageserverState::Available {
|
||||
last_seen_at: Instant::now(),
|
||||
utilization,
|
||||
new: new_node,
|
||||
}
|
||||
} else {
|
||||
PageserverState::Offline
|
||||
@@ -220,6 +223,7 @@ impl HeartbeaterTask {
|
||||
}
|
||||
},
|
||||
Vacant(_) => {
|
||||
// This is a new node. Don't generate a delta for it.
|
||||
deltas.push((node_id, ps_state.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::{str::FromStr, time::Duration};
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy,
|
||||
TenantLocateResponseShard,
|
||||
TenantLocateResponseShard, UtilizationScore,
|
||||
},
|
||||
shard::TenantShardId,
|
||||
};
|
||||
@@ -116,6 +116,16 @@ impl Node {
|
||||
match (self.availability, availability) {
|
||||
(Offline, Active(_)) => ToActive,
|
||||
(Active(_), Offline) => ToOffline,
|
||||
// Consider the case when the storage controller handles the re-attach of a node
|
||||
// before the heartbeats detect that the node is back online. We still need
|
||||
// [`Service::node_configure`] to attempt reconciliations for shards with an
|
||||
// unknown observed location.
|
||||
// The unsavoury match arm below handles this situation.
|
||||
(Active(lhs), Active(rhs))
|
||||
if lhs == UtilizationScore::worst() && rhs < UtilizationScore::worst() =>
|
||||
{
|
||||
ToActive
|
||||
}
|
||||
_ => Unchanged,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ use crate::{
|
||||
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, WrappedWriteGuard},
|
||||
persistence::{AbortShardSplitStatus, TenantFilter},
|
||||
reconciler::{ReconcileError, ReconcileUnits},
|
||||
scheduler::{ScheduleContext, ScheduleMode},
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
|
||||
tenant_shard::{
|
||||
MigrateAttachment, ReconcileNeeded, ScheduleOptimization, ScheduleOptimizationAction,
|
||||
},
|
||||
@@ -747,29 +747,61 @@ impl Service {
|
||||
let res = self.heartbeater.heartbeat(nodes).await;
|
||||
if let Ok(deltas) = res {
|
||||
for (node_id, state) in deltas.0 {
|
||||
let new_availability = match state {
|
||||
PageserverState::Available { utilization, .. } => NodeAvailability::Active(
|
||||
UtilizationScore(utilization.utilization_score),
|
||||
let (new_node, new_availability) = match state {
|
||||
PageserverState::Available {
|
||||
utilization, new, ..
|
||||
} => (
|
||||
new,
|
||||
NodeAvailability::Active(UtilizationScore(
|
||||
utilization.utilization_score,
|
||||
)),
|
||||
),
|
||||
PageserverState::Offline => NodeAvailability::Offline,
|
||||
PageserverState::Offline => (false, NodeAvailability::Offline),
|
||||
};
|
||||
let res = self
|
||||
.node_configure(node_id, Some(new_availability), None)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(()) => {}
|
||||
Err(ApiError::NotFound(_)) => {
|
||||
// This should be rare, but legitimate since the heartbeats are done
|
||||
// on a snapshot of the nodes.
|
||||
tracing::info!("Node {} was not found after heartbeat round", node_id);
|
||||
if new_node {
|
||||
// When the heartbeats detect a newly added node, we don't wish
|
||||
// to attempt to reconcile the shards assigned to it. The node
|
||||
// is likely handling it's re-attach response, so reconciling now
|
||||
// would be counterproductive.
|
||||
//
|
||||
// Instead, update the in-memory state with the details learned about the
|
||||
// node.
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, _tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let mut new_nodes = (**nodes).clone();
|
||||
|
||||
if let Some(node) = new_nodes.get_mut(&node_id) {
|
||||
node.set_availability(new_availability);
|
||||
scheduler.node_upsert(node);
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!(
|
||||
"Failed to update node {} after heartbeat round: {}",
|
||||
node_id,
|
||||
err
|
||||
);
|
||||
|
||||
locked.nodes = Arc::new(new_nodes);
|
||||
} else {
|
||||
// This is the code path for geniune availability transitions (i.e node
|
||||
// goes unavailable and/or comes back online).
|
||||
let res = self
|
||||
.node_configure(node_id, Some(new_availability), None)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(()) => {}
|
||||
Err(ApiError::NotFound(_)) => {
|
||||
// This should be rare, but legitimate since the heartbeats are done
|
||||
// on a snapshot of the nodes.
|
||||
tracing::info!(
|
||||
"Node {} was not found after heartbeat round",
|
||||
node_id
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!(
|
||||
"Failed to update node {} after heartbeat round: {}",
|
||||
node_id,
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2409,11 +2441,17 @@ impl Service {
|
||||
(detach_waiters, shard_ids, node.clone())
|
||||
};
|
||||
|
||||
if let Err(e) = self.await_waiters(detach_waiters, RECONCILE_TIMEOUT).await {
|
||||
// Failing to detach shouldn't hold up deletion, e.g. if a node is offline we should be able
|
||||
// to use some other node to run the remote deletion.
|
||||
tracing::warn!("Failed to detach some locations: {e}");
|
||||
}
|
||||
// This reconcile wait can fail in a few ways:
|
||||
// A there is a very long queue for the reconciler semaphore
|
||||
// B some pageserver is failing to handle a detach promptly
|
||||
// C some pageserver goes offline right at the moment we send it a request.
|
||||
//
|
||||
// A and C are transient: the semaphore will eventually become available, and once a node is marked offline
|
||||
// the next attempt to reconcile will silently skip detaches for an offline node and succeed. If B happens,
|
||||
// it's a bug, and needs resolving at the pageserver level (we shouldn't just leave attachments behind while
|
||||
// deleting the underlying data).
|
||||
self.await_waiters(detach_waiters, RECONCILE_TIMEOUT)
|
||||
.await?;
|
||||
|
||||
let locations = shard_ids
|
||||
.into_iter()
|
||||
@@ -2431,13 +2469,11 @@ impl Service {
|
||||
for result in results {
|
||||
match result {
|
||||
Ok(StatusCode::ACCEPTED) => {
|
||||
// This could happen if we failed detach above, and hit a pageserver where the tenant
|
||||
// is still attached: it will accept the deletion in the background
|
||||
tracing::warn!(
|
||||
"Unexpectedly still attached on {}, client should retry",
|
||||
// This should never happen: we waited for detaches to finish above
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Unexpectedly still attached on {}",
|
||||
node
|
||||
);
|
||||
return Ok(StatusCode::ACCEPTED);
|
||||
)));
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(mgmt_api::Error::Cancelled) => {
|
||||
@@ -4312,6 +4348,16 @@ impl Service {
|
||||
continue;
|
||||
}
|
||||
|
||||
if !new_nodes
|
||||
.values()
|
||||
.any(|n| matches!(n.may_schedule(), MaySchedule::Yes(_)))
|
||||
{
|
||||
// Special case for when all nodes are unavailable and/or unschedulable: there is no point
|
||||
// trying to reschedule since there's nowhere else to go. Without this
|
||||
// branch we incorrectly detach tenants in response to node unavailability.
|
||||
continue;
|
||||
}
|
||||
|
||||
if tenant_shard.intent.demote_attached(scheduler, node_id) {
|
||||
tenant_shard.sequence = tenant_shard.sequence.next();
|
||||
|
||||
@@ -4349,6 +4395,12 @@ impl Service {
|
||||
// When a node comes back online, we must reconcile any tenant that has a None observed
|
||||
// location on the node.
|
||||
for tenant_shard in locked.tenants.values_mut() {
|
||||
// If a reconciliation is already in progress, rely on the previous scheduling
|
||||
// decision and skip triggering a new reconciliation.
|
||||
if tenant_shard.reconciler.is_some() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) {
|
||||
if observed_loc.conf.is_none() {
|
||||
self.maybe_reconcile_shard(tenant_shard, &new_nodes);
|
||||
|
||||
@@ -94,8 +94,6 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
|
||||
".*WARN.*path=/v1/utilization .*request was dropped before completing",
|
||||
# Can happen during shutdown
|
||||
".*scheduling deletion on drop failed: queue is in state Stopped.*",
|
||||
# Can happen during shutdown
|
||||
".*ignoring failure to find gc cutoffs: timeline shutting down.*",
|
||||
)
|
||||
|
||||
|
||||
|
||||
15
test_runner/performance/pgvector/halfvec_build.sql
Normal file
15
test_runner/performance/pgvector/halfvec_build.sql
Normal file
@@ -0,0 +1,15 @@
|
||||
DROP TABLE IF EXISTS halfvec_test_table;
|
||||
|
||||
CREATE TABLE halfvec_test_table (
|
||||
_id text NOT NULL,
|
||||
title text,
|
||||
text text,
|
||||
embeddings halfvec(1536),
|
||||
PRIMARY KEY (_id)
|
||||
);
|
||||
|
||||
INSERT INTO halfvec_test_table (_id, title, text, embeddings)
|
||||
SELECT _id, title, text, embeddings::halfvec
|
||||
FROM documents;
|
||||
|
||||
CREATE INDEX documents_half_precision_hnsw_idx ON halfvec_test_table USING hnsw (embeddings halfvec_cosine_ops) WITH (m = 64, ef_construction = 128);
|
||||
@@ -0,0 +1,13 @@
|
||||
-- run with pooled connection
|
||||
-- pgbench -T 300 -c 100 -j20 -f pgbench_halfvec_queries.sql -postgresql://neondb_owner:<secret>@ep-floral-thunder-w1gzhaxi-pooler.eu-west-1.aws.neon.build/neondb?sslmode=require"
|
||||
|
||||
with x (x) as (
|
||||
select "embeddings" as x
|
||||
from halfvec_test_table
|
||||
TABLESAMPLE SYSTEM (1)
|
||||
LIMIT 1
|
||||
)
|
||||
SELECT title, "embeddings" <=> (select x from x) as distance
|
||||
FROM halfvec_test_table
|
||||
ORDER BY 2
|
||||
LIMIT 30;
|
||||
@@ -1,13 +0,0 @@
|
||||
-- run with pooled connection
|
||||
-- pgbench -T 300 -c 100 -j20 -f pgbench_hnsw_queries.sql -postgresql://neondb_owner:<secret>@ep-floral-thunder-w1gzhaxi-pooler.eu-west-1.aws.neon.build/neondb?sslmode=require"
|
||||
|
||||
with x (x) as (
|
||||
select "embeddings" as x
|
||||
from hnsw_test_table
|
||||
TABLESAMPLE SYSTEM (1)
|
||||
LIMIT 1
|
||||
)
|
||||
SELECT title, "embeddings" <=> (select x from x) as distance
|
||||
FROM hnsw_test_table
|
||||
ORDER BY 2
|
||||
LIMIT 30;
|
||||
@@ -106,6 +106,7 @@ QUERIES: Tuple[LabelledQuery, ...] = (
|
||||
# Disable auto formatting for the list of queries so that it's easier to read
|
||||
# fmt: off
|
||||
PGVECTOR_QUERIES: Tuple[LabelledQuery, ...] = (
|
||||
LabelledQuery("PGVPREP", r"ALTER EXTENSION VECTOR UPDATE;"),
|
||||
LabelledQuery("PGV0", r"DROP TABLE IF EXISTS hnsw_test_table;"),
|
||||
LabelledQuery("PGV1", r"CREATE TABLE hnsw_test_table AS TABLE documents WITH NO DATA;"),
|
||||
LabelledQuery("PGV2", r"INSERT INTO hnsw_test_table SELECT * FROM documents;"),
|
||||
@@ -115,6 +116,10 @@ PGVECTOR_QUERIES: Tuple[LabelledQuery, ...] = (
|
||||
LabelledQuery("PGV6", r"CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_l1_ops);"),
|
||||
LabelledQuery("PGV7", r"CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_hamming_ops);"),
|
||||
LabelledQuery("PGV8", r"CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_jaccard_ops);"),
|
||||
LabelledQuery("PGV9", r"DROP TABLE IF EXISTS halfvec_test_table;"),
|
||||
LabelledQuery("PGV10", r"CREATE TABLE halfvec_test_table (_id text NOT NULL, title text, text text, embeddings halfvec(1536), PRIMARY KEY (_id));"),
|
||||
LabelledQuery("PGV11", r"INSERT INTO halfvec_test_table (_id, title, text, embeddings) SELECT _id, title, text, embeddings::halfvec FROM documents;"),
|
||||
LabelledQuery("PGV12", r"CREATE INDEX documents_half_precision_hnsw_idx ON halfvec_test_table USING hnsw (embeddings halfvec_cosine_ops) WITH (m = 64, ef_construction = 128);"),
|
||||
)
|
||||
# fmt: on
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ class PgBenchLoadType(enum.Enum):
|
||||
SIMPLE_UPDATE = "simple-update"
|
||||
SELECT_ONLY = "select-only"
|
||||
PGVECTOR_HNSW = "pgvector-hnsw"
|
||||
PGVECTOR_HALFVEC = "pgvector-halfvec"
|
||||
|
||||
|
||||
def utc_now_timestamp() -> int:
|
||||
@@ -153,6 +154,26 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
|
||||
password=password,
|
||||
)
|
||||
|
||||
if workload_type == PgBenchLoadType.PGVECTOR_HALFVEC:
|
||||
# Run simple-update workload
|
||||
run_pgbench(
|
||||
env,
|
||||
"pgvector-halfvec",
|
||||
[
|
||||
"pgbench",
|
||||
"-f",
|
||||
"test_runner/performance/pgvector/pgbench_custom_script_pgvector_halfvec_queries.sql",
|
||||
"-c100",
|
||||
"-j20",
|
||||
f"-T{duration}",
|
||||
"-P2",
|
||||
"--protocol=prepared",
|
||||
"--progress-timestamp",
|
||||
connstr,
|
||||
],
|
||||
password=password,
|
||||
)
|
||||
|
||||
env.report_size()
|
||||
|
||||
|
||||
@@ -222,13 +243,3 @@ def test_pgbench_remote_simple_update(remote_compare: PgCompare, scale: int, dur
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgbench_remote_select_only(remote_compare: PgCompare, scale: int, duration: int):
|
||||
run_test_pgbench(remote_compare, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
||||
|
||||
|
||||
# The following test runs on an existing database that has pgvector extension installed
|
||||
# and a table with 1 million embedding vectors loaded and indexed with HNSW.
|
||||
#
|
||||
# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup.
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgbench_remote_pgvector(remote_compare: PgCompare, duration: int):
|
||||
run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HNSW)
|
||||
|
||||
24
test_runner/performance/test_perf_pgvector_queries.py
Normal file
24
test_runner/performance/test_perf_pgvector_queries.py
Normal file
@@ -0,0 +1,24 @@
|
||||
import pytest
|
||||
from fixtures.compare_fixtures import PgCompare
|
||||
|
||||
from performance.test_perf_pgbench import PgBenchLoadType, get_durations_matrix, run_test_pgbench
|
||||
|
||||
|
||||
# The following test runs on an existing database that has pgvector extension installed
|
||||
# and a table with 1 million embedding vectors loaded and indexed with HNSW.
|
||||
#
|
||||
# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup.
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgbench_remote_pgvector_hnsw(remote_compare: PgCompare, duration: int):
|
||||
run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HNSW)
|
||||
|
||||
|
||||
# The following test runs on an existing database that has pgvector extension installed
|
||||
# and a table with 1 million embedding vectors loaded and indexed with halfvec.
|
||||
#
|
||||
# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup.
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgbench_remote_pgvector_halfvec(remote_compare: PgCompare, duration: int):
|
||||
run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HALFVEC)
|
||||
@@ -300,7 +300,7 @@ def test_replica_query_race(neon_simple_env: NeonEnv):
|
||||
p_cur.execute("CREATE TABLE test AS SELECT 0 AS counter")
|
||||
|
||||
standby_ep = env.endpoints.new_replica_start(origin=primary_ep, endpoint_id="standby")
|
||||
time.sleep(1)
|
||||
wait_replica_caughtup(primary_ep, standby_ep)
|
||||
|
||||
# In primary, run a lot of UPDATEs on a single page
|
||||
finished = False
|
||||
|
||||
@@ -129,3 +129,33 @@ def test_ondemand_download_replica(neon_env_builder: NeonEnvBuilder, shard_count
|
||||
cur_replica = conn_replica.cursor()
|
||||
cur_replica.execute("SELECT * FROM clogtest")
|
||||
assert cur_replica.fetchall() == [(1,), (3,)]
|
||||
|
||||
|
||||
def test_ondemand_download_after_wal_switch(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test on-demand SLRU download on standby, when starting right after
|
||||
WAL segment switch.
|
||||
|
||||
This is a repro for a bug in how the LSN at WAL page/segment
|
||||
boundary was handled (https://github.com/neondatabase/neon/issues/8030)
|
||||
"""
|
||||
|
||||
tenant_conf = {
|
||||
"lazy_slru_download": "true",
|
||||
}
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
# Create a test table
|
||||
cur.execute("CREATE TABLE clogtest (id integer)")
|
||||
cur.execute("INSERT INTO clogtest VALUES (1)")
|
||||
|
||||
# Start standby at WAL segment boundary
|
||||
cur.execute("SELECT pg_switch_wal()")
|
||||
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_insert_lsn()"))
|
||||
_endpoint_at_lsn = env.endpoints.create_start(
|
||||
branch_name="main", endpoint_id="ep-at-lsn", lsn=lsn
|
||||
)
|
||||
|
||||
@@ -133,6 +133,9 @@ def test_storage_controller_smoke(
|
||||
|
||||
wait_until(10, 1, lambda: node_evacuated(env.pageservers[0].id))
|
||||
|
||||
# Let all the reconciliations after marking the node offline complete
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
# Marking pageserver active should not migrate anything to it
|
||||
# immediately
|
||||
env.storage_controller.node_configure(env.pageservers[0].id, {"availability": "Active"})
|
||||
@@ -931,19 +934,27 @@ class Failure:
|
||||
def clear(self, env: NeonEnv):
|
||||
raise NotImplementedError()
|
||||
|
||||
def nodes(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class NodeStop(Failure):
|
||||
def __init__(self, pageserver_id, immediate):
|
||||
self.pageserver_id = pageserver_id
|
||||
def __init__(self, pageserver_ids, immediate):
|
||||
self.pageserver_ids = pageserver_ids
|
||||
self.immediate = immediate
|
||||
|
||||
def apply(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.stop(immediate=self.immediate)
|
||||
for ps_id in self.pageserver_ids:
|
||||
pageserver = env.get_pageserver(ps_id)
|
||||
pageserver.stop(immediate=self.immediate)
|
||||
|
||||
def clear(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.start()
|
||||
for ps_id in self.pageserver_ids:
|
||||
pageserver = env.get_pageserver(ps_id)
|
||||
pageserver.start()
|
||||
|
||||
def nodes(self):
|
||||
return self.pageserver_ids
|
||||
|
||||
|
||||
class PageserverFailpoint(Failure):
|
||||
@@ -959,6 +970,9 @@ class PageserverFailpoint(Failure):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.http_client().configure_failpoints((self.failpoint, "off"))
|
||||
|
||||
def nodes(self):
|
||||
return [self.pageserver_id]
|
||||
|
||||
|
||||
def build_node_to_tenants_map(env: NeonEnv) -> dict[int, list[TenantId]]:
|
||||
tenants = env.storage_controller.tenant_list()
|
||||
@@ -982,8 +996,9 @@ def build_node_to_tenants_map(env: NeonEnv) -> dict[int, list[TenantId]]:
|
||||
@pytest.mark.parametrize(
|
||||
"failure",
|
||||
[
|
||||
NodeStop(pageserver_id=1, immediate=False),
|
||||
NodeStop(pageserver_id=1, immediate=True),
|
||||
NodeStop(pageserver_ids=[1], immediate=False),
|
||||
NodeStop(pageserver_ids=[1], immediate=True),
|
||||
NodeStop(pageserver_ids=[1, 2], immediate=True),
|
||||
PageserverFailpoint(pageserver_id=1, failpoint="get-utilization-http-handler"),
|
||||
],
|
||||
)
|
||||
@@ -1036,33 +1051,50 @@ def test_storage_controller_heartbeats(
|
||||
wait_until(10, 1, tenants_placed)
|
||||
|
||||
# ... then we apply the failure
|
||||
offline_node_id = failure.pageserver_id
|
||||
online_node_id = (set(range(1, len(env.pageservers) + 1)) - {offline_node_id}).pop()
|
||||
env.get_pageserver(offline_node_id).allowed_errors.append(
|
||||
# In the case of the failpoint failure, the impacted pageserver
|
||||
# still believes it has the tenant attached since location
|
||||
# config calls into it will fail due to being marked offline.
|
||||
".*Dropped remote consistent LSN updates.*",
|
||||
)
|
||||
offline_node_ids = set(failure.nodes())
|
||||
online_node_ids = set(range(1, len(env.pageservers) + 1)) - offline_node_ids
|
||||
|
||||
for node_id in offline_node_ids:
|
||||
env.get_pageserver(node_id).allowed_errors.append(
|
||||
# In the case of the failpoint failure, the impacted pageserver
|
||||
# still believes it has the tenant attached since location
|
||||
# config calls into it will fail due to being marked offline.
|
||||
".*Dropped remote consistent LSN updates.*",
|
||||
)
|
||||
|
||||
if len(offline_node_ids) > 1:
|
||||
env.get_pageserver(node_id).allowed_errors.append(
|
||||
".*Scheduling error when marking pageserver.*offline.*",
|
||||
)
|
||||
|
||||
failure.apply(env)
|
||||
|
||||
# ... expecting the heartbeats to mark it offline
|
||||
def node_offline():
|
||||
def nodes_offline():
|
||||
nodes = env.storage_controller.node_list()
|
||||
log.info(f"{nodes=}")
|
||||
target = next(n for n in nodes if n["id"] == offline_node_id)
|
||||
assert target["availability"] == "Offline"
|
||||
for node in nodes:
|
||||
if node["id"] in offline_node_ids:
|
||||
assert node["availability"] == "Offline"
|
||||
|
||||
# A node is considered offline if the last successful heartbeat
|
||||
# was more than 10 seconds ago (hardcoded in the storage controller).
|
||||
wait_until(20, 1, node_offline)
|
||||
wait_until(20, 1, nodes_offline)
|
||||
|
||||
# .. expecting the tenant on the offline node to be migrated
|
||||
def tenant_migrated():
|
||||
if len(online_node_ids) == 0:
|
||||
time.sleep(5)
|
||||
return
|
||||
|
||||
node_to_tenants = build_node_to_tenants_map(env)
|
||||
log.info(f"{node_to_tenants=}")
|
||||
assert set(node_to_tenants[online_node_id]) == set(tenant_ids)
|
||||
|
||||
observed_tenants = set()
|
||||
for node_id in online_node_ids:
|
||||
observed_tenants |= set(node_to_tenants[node_id])
|
||||
|
||||
assert observed_tenants == set(tenant_ids)
|
||||
|
||||
wait_until(10, 1, tenant_migrated)
|
||||
|
||||
@@ -1070,31 +1102,24 @@ def test_storage_controller_heartbeats(
|
||||
failure.clear(env)
|
||||
|
||||
# ... expecting the offline node to become active again
|
||||
def node_online():
|
||||
def nodes_online():
|
||||
nodes = env.storage_controller.node_list()
|
||||
target = next(n for n in nodes if n["id"] == offline_node_id)
|
||||
assert target["availability"] == "Active"
|
||||
for node in nodes:
|
||||
if node["id"] in online_node_ids:
|
||||
assert node["availability"] == "Active"
|
||||
|
||||
wait_until(10, 1, node_online)
|
||||
wait_until(10, 1, nodes_online)
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
# ... then we create a new tenant
|
||||
tid = TenantId.generate()
|
||||
env.storage_controller.tenant_create(tid)
|
||||
|
||||
# ... expecting it to be placed on the node that just came back online
|
||||
tenants = env.storage_controller.tenant_list()
|
||||
newest_tenant = next(t for t in tenants if t["tenant_shard_id"] == str(tid))
|
||||
locations = list(newest_tenant["observed"]["locations"].keys())
|
||||
locations = [int(node_id) for node_id in locations]
|
||||
assert locations == [offline_node_id]
|
||||
node_to_tenants = build_node_to_tenants_map(env)
|
||||
log.info(f"Back online: {node_to_tenants=}")
|
||||
|
||||
# ... expecting the storage controller to reach a consistent state
|
||||
def storage_controller_consistent():
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
wait_until(10, 1, storage_controller_consistent)
|
||||
wait_until(30, 1, storage_controller_consistent)
|
||||
|
||||
|
||||
def test_storage_controller_re_attach(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
@@ -678,10 +678,6 @@ def test_synthetic_size_while_deleting(neon_env_builder: NeonEnvBuilder):
|
||||
with pytest.raises(PageserverApiException, match=matcher):
|
||||
completion.result()
|
||||
|
||||
# this happens on both cases
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*ignoring failure to find gc cutoffs: timeline shutting down.*"
|
||||
)
|
||||
# this happens only in the case of deletion (http response logging)
|
||||
env.pageserver.allowed_errors.append(".*Failed to refresh gc_info before gathering inputs.*")
|
||||
|
||||
|
||||
@@ -287,13 +287,6 @@ def test_vm_bit_clear_on_heap_lock_blackbox(neon_env_builder: NeonEnvBuilder):
|
||||
# already truncated away.
|
||||
#
|
||||
# ERROR: could not access status of transaction 1027
|
||||
|
||||
# Debugging https://github.com/neondatabase/neon/issues/6967
|
||||
# the select() below fails occassionally at get_impl="vectored" validation
|
||||
env.pageserver.http_client().patch_tenant_config_client_side(
|
||||
tenant_id,
|
||||
{"test_vm_bit_debug_logging": True},
|
||||
)
|
||||
cur.execute("select xmin, xmax, * from vmtest_lock where id = 40000 for update")
|
||||
tup = cur.fetchall()
|
||||
log.info(f"tuple = {tup}")
|
||||
|
||||
@@ -601,13 +601,16 @@ async def run_segment_init_failure(env: NeonEnv):
|
||||
conn = await ep.connect_async()
|
||||
ep.safe_psql("select pg_switch_wal()") # jump to the segment boundary
|
||||
# next insertion should hang until failpoint is disabled.
|
||||
asyncio.create_task(conn.execute("insert into t select generate_series(1,1), 'payload'"))
|
||||
bg_query = asyncio.create_task(
|
||||
conn.execute("insert into t select generate_series(1,1), 'payload'")
|
||||
)
|
||||
sleep_sec = 2
|
||||
await asyncio.sleep(sleep_sec)
|
||||
# also restart ep at segment boundary to make test more interesting
|
||||
ep.stop()
|
||||
# it must still be not finished
|
||||
# assert not bg_query.done()
|
||||
assert not bg_query.done()
|
||||
# Also restart ep at segment boundary to make test more interesting. Do it in immediate mode;
|
||||
# fast will hang because it will try to gracefully finish sending WAL.
|
||||
ep.stop(mode="immediate")
|
||||
# Without segment rename during init (#6402) previous statement created
|
||||
# partially initialized 16MB segment, so sk restart also triggers #6401.
|
||||
sk.stop().start()
|
||||
|
||||
@@ -18,7 +18,7 @@ commands:
|
||||
- name: postgres-exporter
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres" /bin/postgres_exporter'
|
||||
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter" /bin/postgres_exporter'
|
||||
- name: sql-exporter
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
@@ -93,7 +93,7 @@ files:
|
||||
target:
|
||||
# Data source name always has a URI schema that matches the driver name. In some cases (e.g. MySQL)
|
||||
# the schema gets dropped or replaced to match the driver expected DSN format.
|
||||
data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable'
|
||||
data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter'
|
||||
|
||||
# Collectors (referenced by name) to execute on the target.
|
||||
# Glob patterns are supported (see <https://pkg.go.dev/path/filepath#Match> for syntax).
|
||||
@@ -128,7 +128,7 @@ files:
|
||||
target:
|
||||
# Data source name always has a URI schema that matches the driver name. In some cases (e.g. MySQL)
|
||||
# the schema gets dropped or replaced to match the driver expected DSN format.
|
||||
data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable'
|
||||
data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter_autoscaling'
|
||||
|
||||
# Collectors (referenced by name) to execute on the target.
|
||||
# Glob patterns are supported (see <https://pkg.go.dev/path/filepath#Match> for syntax).
|
||||
@@ -324,14 +324,15 @@ files:
|
||||
help: 'Whether or not the replication slot wal_status is lost'
|
||||
key_labels:
|
||||
- slot_name
|
||||
values: [wal_status_is_lost]
|
||||
values: [wal_is_lost]
|
||||
query: |
|
||||
SELECT slot_name,
|
||||
CASE
|
||||
WHEN wal_status = 'lost' THEN 1
|
||||
ELSE 0
|
||||
END AS wal_status_is_lost
|
||||
END AS wal_is_lost
|
||||
FROM pg_replication_slots;
|
||||
|
||||
- filename: neon_collector_autoscaling.yml
|
||||
content: |
|
||||
collector_name: neon_collector_autoscaling
|
||||
|
||||
Reference in New Issue
Block a user