mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
34 Commits
problame/f
...
wp-mref
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
73a525de62 | ||
|
|
62c0c1d797 | ||
|
|
4feb6ba29c | ||
|
|
29a41fc7b9 | ||
|
|
d8b2a49c55 | ||
|
|
ed9ffb9af2 | ||
|
|
6c6a7f9ace | ||
|
|
e729f28205 | ||
|
|
b6e1c09c73 | ||
|
|
16d80128ee | ||
|
|
2ba414525e | ||
|
|
46210035c5 | ||
|
|
81892199f6 | ||
|
|
83eb02b07a | ||
|
|
a71f58e69c | ||
|
|
e6eb0020a1 | ||
|
|
eb0ca9b648 | ||
|
|
6843fd8f89 | ||
|
|
edc900028e | ||
|
|
789196572e | ||
|
|
425eed24e8 | ||
|
|
f67010109f | ||
|
|
0c3e3a8667 | ||
|
|
82719542c6 | ||
|
|
d25f7e3dd5 | ||
|
|
fbccd1e676 | ||
|
|
dc2ab4407f | ||
|
|
ad0ab3b81b | ||
|
|
836d1f4af7 | ||
|
|
9dda13ecce | ||
|
|
9ba9f32dfe | ||
|
|
3099e1a787 | ||
|
|
f749437cec | ||
|
|
0a256148b0 |
8
.github/workflows/benchmarking.yml
vendored
8
.github/workflows/benchmarking.yml
vendored
@@ -99,7 +99,7 @@ jobs:
|
||||
# Set --sparse-ordering option of pytest-order plugin
|
||||
# to ensure tests are running in order of appears in the file.
|
||||
# It's important for test_perf_pgbench.py::test_pgbench_remote_* tests
|
||||
extra_params: -m remote_cluster --sparse-ordering --timeout 5400 --ignore test_runner/performance/test_perf_olap.py
|
||||
extra_params: -m remote_cluster --sparse-ordering --timeout 5400 --ignore test_runner/performance/test_perf_olap.py --ignore test_runner/performance/test_perf_pgvector_queries.py
|
||||
env:
|
||||
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
@@ -410,14 +410,14 @@ jobs:
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
|
||||
- name: Benchmark pgvector hnsw queries
|
||||
- name: Benchmark pgvector queries
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
build_type: ${{ env.BUILD_TYPE }}
|
||||
test_selection: performance
|
||||
test_selection: performance/test_perf_pgvector_queries.py
|
||||
run_in_parallel: false
|
||||
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
|
||||
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_pgvector
|
||||
extra_params: -m remote_cluster --timeout 21600
|
||||
env:
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
|
||||
@@ -30,7 +30,6 @@ jobs:
|
||||
check-image:
|
||||
uses: ./.github/workflows/check-build-tools-image.yml
|
||||
|
||||
# This job uses older version of GitHub Actions because it's run on gen2 runners, which don't support node 20 (for newer versions)
|
||||
build-image:
|
||||
needs: [ check-image ]
|
||||
if: needs.check-image.outputs.found == 'false'
|
||||
@@ -55,7 +54,7 @@ jobs:
|
||||
exit 1
|
||||
fi
|
||||
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
# Use custom DOCKER_CONFIG directory to avoid conflicts with default settings
|
||||
# The default value is ~/.docker
|
||||
|
||||
68
.github/workflows/build_and_test.yml
vendored
68
.github/workflows/build_and_test.yml
vendored
@@ -299,21 +299,21 @@ jobs:
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v14
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
|
||||
|
||||
- name: Cache postgres v15 build
|
||||
id: cache_pg_15
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v15
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
|
||||
|
||||
- name: Cache postgres v16 build
|
||||
id: cache_pg_16
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v16
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
|
||||
|
||||
- name: Build postgres v14
|
||||
if: steps.cache_pg_14.outputs.cache-hit != 'true'
|
||||
@@ -337,34 +337,8 @@ jobs:
|
||||
run: |
|
||||
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests
|
||||
|
||||
- name: Run rust tests
|
||||
env:
|
||||
NEXTEST_RETRIES: 3
|
||||
run: |
|
||||
#nextest does not yet support running doctests
|
||||
cargo test --doc $CARGO_FLAGS $CARGO_FEATURES
|
||||
|
||||
for io_engine in std-fs tokio-epoll-uring ; do
|
||||
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES
|
||||
done
|
||||
|
||||
# Run separate tests for real S3
|
||||
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
|
||||
export REMOTE_STORAGE_S3_BUCKET=neon-github-ci-tests
|
||||
export REMOTE_STORAGE_S3_REGION=eu-central-1
|
||||
# Avoid `$CARGO_FEATURES` since there's no `testing` feature in the e2e tests now
|
||||
${cov_prefix} cargo nextest run $CARGO_FLAGS -E 'package(remote_storage)' -E 'test(test_real_s3)'
|
||||
|
||||
# Run separate tests for real Azure Blob Storage
|
||||
# XXX: replace region with `eu-central-1`-like region
|
||||
export ENABLE_REAL_AZURE_REMOTE_STORAGE=y
|
||||
export AZURE_STORAGE_ACCOUNT="${{ secrets.AZURE_STORAGE_ACCOUNT_DEV }}"
|
||||
export AZURE_STORAGE_ACCESS_KEY="${{ secrets.AZURE_STORAGE_ACCESS_KEY_DEV }}"
|
||||
export REMOTE_STORAGE_AZURE_CONTAINER="${{ vars.REMOTE_STORAGE_AZURE_CONTAINER }}"
|
||||
export REMOTE_STORAGE_AZURE_REGION="${{ vars.REMOTE_STORAGE_AZURE_REGION }}"
|
||||
# Avoid `$CARGO_FEATURES` since there's no `testing` feature in the e2e tests now
|
||||
${cov_prefix} cargo nextest run $CARGO_FLAGS -E 'package(remote_storage)' -E 'test(test_real_azure)'
|
||||
|
||||
# Do install *before* running rust tests because they might recompile the
|
||||
# binaries with different features/flags.
|
||||
- name: Install rust binaries
|
||||
run: |
|
||||
# Install target binaries
|
||||
@@ -405,6 +379,32 @@ jobs:
|
||||
done
|
||||
fi
|
||||
|
||||
- name: Run rust tests
|
||||
env:
|
||||
NEXTEST_RETRIES: 3
|
||||
run: |
|
||||
#nextest does not yet support running doctests
|
||||
cargo test --doc $CARGO_FLAGS $CARGO_FEATURES
|
||||
|
||||
for io_engine in std-fs tokio-epoll-uring ; do
|
||||
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES
|
||||
done
|
||||
|
||||
# Run separate tests for real S3
|
||||
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
|
||||
export REMOTE_STORAGE_S3_BUCKET=neon-github-ci-tests
|
||||
export REMOTE_STORAGE_S3_REGION=eu-central-1
|
||||
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(remote_storage)' -E 'test(test_real_s3)'
|
||||
|
||||
# Run separate tests for real Azure Blob Storage
|
||||
# XXX: replace region with `eu-central-1`-like region
|
||||
export ENABLE_REAL_AZURE_REMOTE_STORAGE=y
|
||||
export AZURE_STORAGE_ACCOUNT="${{ secrets.AZURE_STORAGE_ACCOUNT_DEV }}"
|
||||
export AZURE_STORAGE_ACCESS_KEY="${{ secrets.AZURE_STORAGE_ACCESS_KEY_DEV }}"
|
||||
export REMOTE_STORAGE_AZURE_CONTAINER="${{ vars.REMOTE_STORAGE_AZURE_CONTAINER }}"
|
||||
export REMOTE_STORAGE_AZURE_REGION="${{ vars.REMOTE_STORAGE_AZURE_REGION }}"
|
||||
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(remote_storage)' -E 'test(test_real_azure)'
|
||||
|
||||
- name: Install postgres binaries
|
||||
run: cp -a pg_install /tmp/neon/pg_install
|
||||
|
||||
@@ -858,7 +858,7 @@ jobs:
|
||||
cache-to: type=registry,ref=neondatabase/compute-node-${{ matrix.version }}:cache-${{ matrix.arch }},mode=max
|
||||
tags: |
|
||||
neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }}
|
||||
|
||||
|
||||
- name: Build neon extensions test image
|
||||
if: matrix.version == 'v16'
|
||||
uses: docker/build-push-action@v5
|
||||
@@ -965,7 +965,7 @@ jobs:
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v1
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
@@ -1101,6 +1101,8 @@ jobs:
|
||||
$repo/vm-compute-node-${version}:${{ needs.tag.outputs.build-tag }}
|
||||
done
|
||||
done
|
||||
docker buildx imagetools create -t neondatabase/neon-test-extensions-v16:latest \
|
||||
neondatabase/neon-test-extensions-v16:${{ needs.tag.outputs.build-tag }}
|
||||
|
||||
trigger-custom-extensions-build-and-wait:
|
||||
needs: [ check-permissions, tag ]
|
||||
|
||||
23
.github/workflows/check-build-tools-image.yml
vendored
23
.github/workflows/check-build-tools-image.yml
vendored
@@ -25,26 +25,17 @@ jobs:
|
||||
found: ${{ steps.check-image.outputs.found }}
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Get build-tools image tag for the current commit
|
||||
id: get-build-tools-tag
|
||||
env:
|
||||
# Usually, for COMMIT_SHA, we use `github.event.pull_request.head.sha || github.sha`, but here, even for PRs,
|
||||
# we want to use `github.sha` i.e. point to a phantom merge commit to determine the image tag correctly.
|
||||
COMMIT_SHA: ${{ github.sha }}
|
||||
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
IMAGE_TAG: |
|
||||
${{ hashFiles('Dockerfile.build-tools',
|
||||
'.github/workflows/check-build-tools-image.yml',
|
||||
'.github/workflows/build-build-tools-image.yml') }}
|
||||
run: |
|
||||
LAST_BUILD_TOOLS_SHA=$(
|
||||
gh api \
|
||||
-H "Accept: application/vnd.github+json" \
|
||||
-H "X-GitHub-Api-Version: 2022-11-28" \
|
||||
--method GET \
|
||||
--field path=Dockerfile.build-tools \
|
||||
--field sha=${COMMIT_SHA} \
|
||||
--field per_page=1 \
|
||||
--jq ".[0].sha" \
|
||||
"/repos/${GITHUB_REPOSITORY}/commits"
|
||||
)
|
||||
echo "image-tag=${LAST_BUILD_TOOLS_SHA}" | tee -a $GITHUB_OUTPUT
|
||||
echo "image-tag=${IMAGE_TAG}" | tee -a $GITHUB_OUTPUT
|
||||
|
||||
- name: Check if such tag found in the registry
|
||||
id: check-image
|
||||
|
||||
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -5158,6 +5158,7 @@ dependencies = [
|
||||
"tokio-io-timeout",
|
||||
"tokio-postgres",
|
||||
"tokio-stream",
|
||||
"tokio-tar",
|
||||
"tokio-util",
|
||||
"toml_edit",
|
||||
"tracing",
|
||||
|
||||
@@ -120,7 +120,7 @@ num_cpus = "1.15"
|
||||
num-traits = "0.2.15"
|
||||
once_cell = "1.13"
|
||||
opentelemetry = "0.20.0"
|
||||
opentelemetry-otlp = { version = "0.13.0", default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-otlp = { version = "0.13.0", default-features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-semantic-conventions = "0.12.0"
|
||||
parking_lot = "0.12"
|
||||
parquet = { version = "51.0.0", default-features = false, features = ["zstd"] }
|
||||
@@ -128,7 +128,7 @@ parquet_derive = "51.0.0"
|
||||
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
|
||||
pin-project-lite = "0.2"
|
||||
procfs = "0.14"
|
||||
prometheus = {version = "0.13", default_features=false, features = ["process"]} # removes protobuf dependency
|
||||
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
|
||||
prost = "0.11"
|
||||
rand = "0.8"
|
||||
redis = { version = "0.25.2", features = ["tokio-rustls-comp", "keep-alive"] }
|
||||
@@ -184,7 +184,7 @@ tower-service = "0.3.2"
|
||||
tracing = "0.1"
|
||||
tracing-error = "0.2.0"
|
||||
tracing-opentelemetry = "0.21.0"
|
||||
tracing-subscriber = { version = "0.3", default_features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json", "ansi"] }
|
||||
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json", "ansi"] }
|
||||
twox-hash = { version = "1.6.3", default-features = false }
|
||||
url = "2.2"
|
||||
urlencoding = "2.1"
|
||||
|
||||
@@ -69,8 +69,6 @@ RUN set -e \
|
||||
&& apt install -y \
|
||||
libreadline-dev \
|
||||
libseccomp-dev \
|
||||
libicu67 \
|
||||
openssl \
|
||||
ca-certificates \
|
||||
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \
|
||||
&& useradd -d /data neon \
|
||||
|
||||
@@ -112,6 +112,45 @@ RUN for package in Capture::Tiny DateTime Devel::Cover Digest::MD5 File::Spec JS
|
||||
&& make install \
|
||||
&& rm -rf ../lcov.tar.gz
|
||||
|
||||
# Compile and install the static OpenSSL library
|
||||
ENV OPENSSL_VERSION=3.2.2
|
||||
ENV OPENSSL_PREFIX=/usr/local/openssl
|
||||
RUN wget -O /tmp/openssl-${OPENSSL_VERSION}.tar.gz https://www.openssl.org/source/openssl-${OPENSSL_VERSION}.tar.gz && \
|
||||
echo "197149c18d9e9f292c43f0400acaba12e5f52cacfe050f3d199277ea738ec2e7 /tmp/openssl-${OPENSSL_VERSION}.tar.gz" | sha256sum --check && \
|
||||
cd /tmp && \
|
||||
tar xzvf /tmp/openssl-${OPENSSL_VERSION}.tar.gz && \
|
||||
rm /tmp/openssl-${OPENSSL_VERSION}.tar.gz && \
|
||||
cd /tmp/openssl-${OPENSSL_VERSION} && \
|
||||
./config --prefix=${OPENSSL_PREFIX} -static --static no-shared -fPIC && \
|
||||
make -j "$(nproc)" && \
|
||||
make install && \
|
||||
cd /tmp && \
|
||||
rm -rf /tmp/openssl-${OPENSSL_VERSION}
|
||||
|
||||
# Use the same version of libicu as the compute nodes so that
|
||||
# clusters created using inidb on pageserver can be used by computes.
|
||||
#
|
||||
# TODO: at this time, Dockerfile.compute-node uses the debian bullseye libicu
|
||||
# package, which is 67.1. We're duplicating that knowledge here, and also, technically,
|
||||
# Debian has a few patches on top of 67.1 that we're not adding here.
|
||||
ENV ICU_VERSION=67.1
|
||||
ENV ICU_PREFIX=/usr/local/icu
|
||||
|
||||
# Download and build static ICU
|
||||
RUN wget -O /tmp/libicu-${ICU_VERSION}.tgz https://github.com/unicode-org/icu/releases/download/release-${ICU_VERSION//./-}/icu4c-${ICU_VERSION//./_}-src.tgz && \
|
||||
echo "94a80cd6f251a53bd2a997f6f1b5ac6653fe791dfab66e1eb0227740fb86d5dc /tmp/libicu-${ICU_VERSION}.tgz" | sha256sum --check && \
|
||||
mkdir /tmp/icu && \
|
||||
pushd /tmp/icu && \
|
||||
tar -xzf /tmp/libicu-${ICU_VERSION}.tgz && \
|
||||
pushd icu/source && \
|
||||
./configure --prefix=${ICU_PREFIX} --enable-static --enable-shared=no CXXFLAGS="-fPIC" CFLAGS="-fPIC" && \
|
||||
make -j "$(nproc)" && \
|
||||
make install && \
|
||||
popd && \
|
||||
rm -rf icu && \
|
||||
rm -f /tmp/libicu-${ICU_VERSION}.tgz && \
|
||||
popd
|
||||
|
||||
# Switch to nonroot user
|
||||
USER nonroot:nonroot
|
||||
WORKDIR /home/nonroot
|
||||
@@ -141,7 +180,7 @@ WORKDIR /home/nonroot
|
||||
|
||||
# Rust
|
||||
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
|
||||
ENV RUSTC_VERSION=1.78.0
|
||||
ENV RUSTC_VERSION=1.79.0
|
||||
ENV RUSTUP_HOME="/home/nonroot/.rustup"
|
||||
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
|
||||
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \
|
||||
@@ -170,3 +209,6 @@ RUN whoami \
|
||||
&& rustup --version --verbose \
|
||||
&& rustc --version --verbose \
|
||||
&& clang --version
|
||||
|
||||
# Set following flag to check in Makefile if its running in Docker
|
||||
RUN touch /home/nonroot/.docker_build
|
||||
|
||||
@@ -246,8 +246,8 @@ COPY patches/pgvector.patch /pgvector.patch
|
||||
# By default, pgvector Makefile uses `-march=native`. We don't want that,
|
||||
# because we build the images on different machines than where we run them.
|
||||
# Pass OPTFLAGS="" to remove it.
|
||||
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.7.1.tar.gz -O pgvector.tar.gz && \
|
||||
echo "fe6c8cb4e0cd1a8cb60f5badf9e1701e0fcabcfc260931c26d01e155c4dd21d1 pgvector.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.7.2.tar.gz -O pgvector.tar.gz && \
|
||||
echo "617fba855c9bcb41a2a9bc78a78567fd2e147c72afd5bf9d37b31b9591632b30 pgvector.tar.gz" | sha256sum --check && \
|
||||
mkdir pgvector-src && cd pgvector-src && tar xzf ../pgvector.tar.gz --strip-components=1 -C . && \
|
||||
patch -p1 < /pgvector.patch && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) OPTFLAGS="" PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
@@ -979,7 +979,7 @@ RUN cd /ext-src/ && for f in *.tar.gz; \
|
||||
do echo $f; dname=$(echo $f | sed 's/\.tar.*//')-src; \
|
||||
rm -rf $dname; mkdir $dname; tar xzf $f --strip-components=1 -C $dname \
|
||||
|| exit 1; rm -f $f; done
|
||||
RUN cd /ext-src/pgvector-src && patch -p1 <../pgvector.patch
|
||||
RUN cd /ext-src/pgvector-src && patch -p1 <../pgvector.patch
|
||||
# cmake is required for the h3 test
|
||||
RUN apt-get update && apt-get install -y cmake
|
||||
RUN patch -p1 < /ext-src/pg_hintplan.patch
|
||||
|
||||
15
Makefile
15
Makefile
@@ -3,6 +3,9 @@ ROOT_PROJECT_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
|
||||
# Where to install Postgres, default is ./pg_install, maybe useful for package managers
|
||||
POSTGRES_INSTALL_DIR ?= $(ROOT_PROJECT_DIR)/pg_install/
|
||||
|
||||
OPENSSL_PREFIX_DIR := /usr/local/openssl
|
||||
ICU_PREFIX_DIR := /usr/local/icu
|
||||
|
||||
#
|
||||
# We differentiate between release / debug build types using the BUILD_TYPE
|
||||
# environment variable.
|
||||
@@ -20,6 +23,16 @@ else
|
||||
$(error Bad build type '$(BUILD_TYPE)', see Makefile for options)
|
||||
endif
|
||||
|
||||
ifeq ($(shell test -e /home/nonroot/.docker_build && echo -n yes),yes)
|
||||
# Exclude static build openssl, icu for local build (MacOS, Linux)
|
||||
# Only keep for build type release and debug
|
||||
PG_CFLAGS += -I$(OPENSSL_PREFIX_DIR)/include
|
||||
PG_CONFIGURE_OPTS += --with-icu
|
||||
PG_CONFIGURE_OPTS += ICU_CFLAGS='-I/$(ICU_PREFIX_DIR)/include -DU_STATIC_IMPLEMENTATION'
|
||||
PG_CONFIGURE_OPTS += ICU_LIBS='-L$(ICU_PREFIX_DIR)/lib -L$(ICU_PREFIX_DIR)/lib64 -licui18n -licuuc -licudata -lstdc++ -Wl,-Bdynamic -lm'
|
||||
PG_CONFIGURE_OPTS += LDFLAGS='-L$(OPENSSL_PREFIX_DIR)/lib -L$(OPENSSL_PREFIX_DIR)/lib64 -L$(ICU_PREFIX_DIR)/lib -L$(ICU_PREFIX_DIR)/lib64 -Wl,-Bstatic -lssl -lcrypto -Wl,-Bdynamic -lrt -lm -ldl -lpthread'
|
||||
endif
|
||||
|
||||
UNAME_S := $(shell uname -s)
|
||||
ifeq ($(UNAME_S),Linux)
|
||||
# Seccomp BPF is only available for Linux
|
||||
@@ -28,7 +41,7 @@ else ifeq ($(UNAME_S),Darwin)
|
||||
ifndef DISABLE_HOMEBREW
|
||||
# macOS with brew-installed openssl requires explicit paths
|
||||
# It can be configured with OPENSSL_PREFIX variable
|
||||
OPENSSL_PREFIX ?= $(shell brew --prefix openssl@3)
|
||||
OPENSSL_PREFIX := $(shell brew --prefix openssl@3)
|
||||
PG_CONFIGURE_OPTS += --with-includes=$(OPENSSL_PREFIX)/include --with-libraries=$(OPENSSL_PREFIX)/lib
|
||||
PG_CONFIGURE_OPTS += PKG_CONFIG_PATH=$(shell brew --prefix icu4c)/lib/pkgconfig
|
||||
# macOS already has bison and flex in the system, but they are old and result in postgres-v14 target failure
|
||||
|
||||
@@ -735,7 +735,7 @@ fn cli() -> clap::Command {
|
||||
Arg::new("filecache-connstr")
|
||||
.long("filecache-connstr")
|
||||
.default_value(
|
||||
"host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable",
|
||||
"host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable application_name=vm-monitor",
|
||||
)
|
||||
.value_name("FILECACHE_CONNSTR"),
|
||||
)
|
||||
|
||||
@@ -918,38 +918,39 @@ impl ComputeNode {
|
||||
// temporarily reset max_cluster_size in config
|
||||
// to avoid the possibility of hitting the limit, while we are reconfiguring:
|
||||
// creating new extensions, roles, etc...
|
||||
config::compute_ctl_temp_override_create(pgdata_path, "neon.max_cluster_size=-1")?;
|
||||
self.pg_reload_conf()?;
|
||||
config::with_compute_ctl_tmp_override(pgdata_path, "neon.max_cluster_size=-1", || {
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
|
||||
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
|
||||
|
||||
// Proceed with post-startup configuration. Note, that order of operations is important.
|
||||
// Disable DDL forwarding because control plane already knows about these roles/databases.
|
||||
if spec.mode == ComputeMode::Primary {
|
||||
client.simple_query("SET neon.forward_ddl = false")?;
|
||||
cleanup_instance(&mut client)?;
|
||||
handle_roles(&spec, &mut client)?;
|
||||
handle_databases(&spec, &mut client)?;
|
||||
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(
|
||||
&spec,
|
||||
&mut client,
|
||||
self.connstr.as_str(),
|
||||
self.has_feature(ComputeFeature::AnonExtension),
|
||||
)?;
|
||||
handle_extensions(&spec, &mut client)?;
|
||||
handle_extension_neon(&mut client)?;
|
||||
// We can skip handle_migrations here because a new migration can only appear
|
||||
// if we have a new version of the compute_ctl binary, which can only happen
|
||||
// if compute got restarted, in which case we'll end up inside of apply_config
|
||||
// instead of reconfigure.
|
||||
}
|
||||
// Proceed with post-startup configuration. Note, that order of operations is important.
|
||||
// Disable DDL forwarding because control plane already knows about these roles/databases.
|
||||
if spec.mode == ComputeMode::Primary {
|
||||
client.simple_query("SET neon.forward_ddl = false")?;
|
||||
cleanup_instance(&mut client)?;
|
||||
handle_roles(&spec, &mut client)?;
|
||||
handle_databases(&spec, &mut client)?;
|
||||
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(
|
||||
&spec,
|
||||
&mut client,
|
||||
self.connstr.as_str(),
|
||||
self.has_feature(ComputeFeature::AnonExtension),
|
||||
)?;
|
||||
handle_extensions(&spec, &mut client)?;
|
||||
handle_extension_neon(&mut client)?;
|
||||
// We can skip handle_migrations here because a new migration can only appear
|
||||
// if we have a new version of the compute_ctl binary, which can only happen
|
||||
// if compute got restarted, in which case we'll end up inside of apply_config
|
||||
// instead of reconfigure.
|
||||
}
|
||||
|
||||
// 'Close' connection
|
||||
drop(client);
|
||||
// 'Close' connection
|
||||
drop(client);
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
// reset max_cluster_size in config back to original value and reload config
|
||||
config::compute_ctl_temp_override_remove(pgdata_path)?;
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
let unknown_op = "unknown".to_string();
|
||||
@@ -1040,12 +1041,17 @@ impl ComputeNode {
|
||||
// temporarily reset max_cluster_size in config
|
||||
// to avoid the possibility of hitting the limit, while we are applying config:
|
||||
// creating new extensions, roles, etc...
|
||||
config::compute_ctl_temp_override_create(pgdata_path, "neon.max_cluster_size=-1")?;
|
||||
self.pg_reload_conf()?;
|
||||
config::with_compute_ctl_tmp_override(
|
||||
pgdata_path,
|
||||
"neon.max_cluster_size=-1",
|
||||
|| {
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
self.apply_config(&compute_state)?;
|
||||
self.apply_config(&compute_state)?;
|
||||
|
||||
config::compute_ctl_temp_override_remove(pgdata_path)?;
|
||||
Ok(())
|
||||
},
|
||||
)?;
|
||||
self.pg_reload_conf()?;
|
||||
}
|
||||
self.post_apply_config()?;
|
||||
|
||||
@@ -131,18 +131,17 @@ pub fn write_postgres_conf(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// create file compute_ctl_temp_override.conf in pgdata_dir
|
||||
/// add provided options to this file
|
||||
pub fn compute_ctl_temp_override_create(pgdata_path: &Path, options: &str) -> Result<()> {
|
||||
pub fn with_compute_ctl_tmp_override<F>(pgdata_path: &Path, options: &str, exec: F) -> Result<()>
|
||||
where
|
||||
F: FnOnce() -> Result<()>,
|
||||
{
|
||||
let path = pgdata_path.join("compute_ctl_temp_override.conf");
|
||||
let mut file = File::create(path)?;
|
||||
write!(file, "{}", options)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// remove file compute_ctl_temp_override.conf in pgdata_dir
|
||||
pub fn compute_ctl_temp_override_remove(pgdata_path: &Path) -> Result<()> {
|
||||
let path = pgdata_path.join("compute_ctl_temp_override.conf");
|
||||
std::fs::remove_file(path)?;
|
||||
Ok(())
|
||||
let res = exec();
|
||||
|
||||
file.set_len(0)?;
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ use hyper::header::CONTENT_TYPE;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
||||
use tokio::task;
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use tracing_utils::http::OtelName;
|
||||
use utils::http::request::must_get_query_param;
|
||||
|
||||
@@ -48,7 +48,7 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
match (req.method(), req.uri().path()) {
|
||||
// Serialized compute state.
|
||||
(&Method::GET, "/status") => {
|
||||
info!("serving /status GET request");
|
||||
debug!("serving /status GET request");
|
||||
let state = compute.state.lock().unwrap();
|
||||
let status_response = status_response_from_state(&state);
|
||||
Response::new(Body::from(serde_json::to_string(&status_response).unwrap()))
|
||||
|
||||
@@ -862,20 +862,13 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
|
||||
let allow_multiple = sub_args.get_flag("allow-multiple");
|
||||
|
||||
// If --safekeepers argument is given, use only the listed safekeeper nodes.
|
||||
let safekeepers =
|
||||
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
|
||||
let mut safekeepers: Vec<NodeId> = Vec::new();
|
||||
for sk_id in safekeepers_str.split(',').map(str::trim) {
|
||||
let sk_id = NodeId(u64::from_str(sk_id).map_err(|_| {
|
||||
anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list")
|
||||
})?);
|
||||
safekeepers.push(sk_id);
|
||||
}
|
||||
safekeepers
|
||||
} else {
|
||||
env.safekeepers.iter().map(|sk| sk.id).collect()
|
||||
};
|
||||
// If --safekeepers argument is given, use only the listed
|
||||
// safekeeper nodes; otherwise all from the env.
|
||||
let safekeepers = if let Some(safekeepers) = parse_safekeepers(&sub_args)? {
|
||||
safekeepers
|
||||
} else {
|
||||
env.safekeepers.iter().map(|sk| sk.id).collect()
|
||||
};
|
||||
|
||||
let endpoint = cplane
|
||||
.endpoints
|
||||
@@ -979,7 +972,10 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
endpoint.reconfigure(pageservers, None).await?;
|
||||
// If --safekeepers argument is given, use only the listed
|
||||
// safekeeper nodes; otherwise all from the env.
|
||||
let safekeepers = parse_safekeepers(&sub_args)?;
|
||||
endpoint.reconfigure(pageservers, None, safekeepers).await?;
|
||||
}
|
||||
"stop" => {
|
||||
let endpoint_id = sub_args
|
||||
@@ -1001,6 +997,23 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Parse --safekeepers as list of safekeeper ids.
|
||||
fn parse_safekeepers(sub_args: &ArgMatches) -> Result<Option<Vec<NodeId>>> {
|
||||
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
|
||||
let mut safekeepers: Vec<NodeId> = Vec::new();
|
||||
for sk_id in safekeepers_str.split(',').map(str::trim) {
|
||||
let sk_id = NodeId(
|
||||
u64::from_str(sk_id)
|
||||
.map_err(|_| anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list"))?,
|
||||
);
|
||||
safekeepers.push(sk_id);
|
||||
}
|
||||
Ok(Some(safekeepers))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
|
||||
let (sub_name, sub_args) = match sub_match.subcommand() {
|
||||
Some(ep_subcommand_data) => ep_subcommand_data,
|
||||
@@ -1573,7 +1586,7 @@ fn cli() -> Command {
|
||||
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
|
||||
.arg(endpoint_id_arg.clone())
|
||||
.arg(endpoint_pageserver_id_arg.clone())
|
||||
.arg(safekeepers_arg)
|
||||
.arg(safekeepers_arg.clone())
|
||||
.arg(remote_ext_config_args)
|
||||
.arg(create_test_user)
|
||||
.arg(allow_multiple.clone())
|
||||
@@ -1581,6 +1594,7 @@ fn cli() -> Command {
|
||||
.subcommand(Command::new("reconfigure")
|
||||
.about("Reconfigure the endpoint")
|
||||
.arg(endpoint_pageserver_id_arg)
|
||||
.arg(safekeepers_arg)
|
||||
.arg(endpoint_id_arg.clone())
|
||||
.arg(tenant_id_arg.clone())
|
||||
)
|
||||
|
||||
@@ -499,6 +499,23 @@ impl Endpoint {
|
||||
.join(",")
|
||||
}
|
||||
|
||||
/// Map safekeepers ids to the actual connection strings.
|
||||
fn build_safekeepers_connstrs(&self, sk_ids: Vec<NodeId>) -> Result<Vec<String>> {
|
||||
let mut safekeeper_connstrings = Vec::new();
|
||||
if self.mode == ComputeMode::Primary {
|
||||
for sk_id in sk_ids {
|
||||
let sk = self
|
||||
.env
|
||||
.safekeepers
|
||||
.iter()
|
||||
.find(|node| node.id == sk_id)
|
||||
.ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
|
||||
safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
|
||||
}
|
||||
}
|
||||
Ok(safekeeper_connstrings)
|
||||
}
|
||||
|
||||
pub async fn start(
|
||||
&self,
|
||||
auth_token: &Option<String>,
|
||||
@@ -523,18 +540,7 @@ impl Endpoint {
|
||||
let pageserver_connstring = Self::build_pageserver_connstr(&pageservers);
|
||||
assert!(!pageserver_connstring.is_empty());
|
||||
|
||||
let mut safekeeper_connstrings = Vec::new();
|
||||
if self.mode == ComputeMode::Primary {
|
||||
for sk_id in safekeepers {
|
||||
let sk = self
|
||||
.env
|
||||
.safekeepers
|
||||
.iter()
|
||||
.find(|node| node.id == sk_id)
|
||||
.ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
|
||||
safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
|
||||
}
|
||||
}
|
||||
let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
|
||||
|
||||
// check for file remote_extensions_spec.json
|
||||
// if it is present, read it and pass to compute_ctl
|
||||
@@ -741,6 +747,7 @@ impl Endpoint {
|
||||
&self,
|
||||
mut pageservers: Vec<(Host, u16)>,
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
safekeepers: Option<Vec<NodeId>>,
|
||||
) -> Result<()> {
|
||||
let mut spec: ComputeSpec = {
|
||||
let spec_path = self.endpoint_path().join("spec.json");
|
||||
@@ -775,6 +782,12 @@ impl Endpoint {
|
||||
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
|
||||
}
|
||||
|
||||
// If safekeepers are not specified, don't change them.
|
||||
if let Some(safekeepers) = safekeepers {
|
||||
let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
|
||||
spec.safekeeper_connstrings = safekeeper_connstrings;
|
||||
}
|
||||
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(Duration::from_secs(30))
|
||||
.build()
|
||||
|
||||
@@ -14,6 +14,7 @@ use camino::Utf8PathBuf;
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use reqwest::{IntoUrl, Method};
|
||||
use thiserror::Error;
|
||||
use utils::auth::{Claims, Scope};
|
||||
use utils::{http::error::HttpErrorBody, id::NodeId};
|
||||
|
||||
use crate::{
|
||||
@@ -197,7 +198,7 @@ impl SafekeeperNode {
|
||||
&datadir,
|
||||
&self.env.safekeeper_bin(),
|
||||
&args,
|
||||
[],
|
||||
self.safekeeper_env_variables()?,
|
||||
background_process::InitialPidFile::Expect(self.pid_file()),
|
||||
|| async {
|
||||
match self.check_status().await {
|
||||
@@ -210,6 +211,18 @@ impl SafekeeperNode {
|
||||
.await
|
||||
}
|
||||
|
||||
fn safekeeper_env_variables(&self) -> anyhow::Result<Vec<(String, String)>> {
|
||||
// Generate a token to connect from safekeeper to peers
|
||||
if self.conf.auth_enabled {
|
||||
let token = self
|
||||
.env
|
||||
.generate_auth_token(&Claims::new(None, Scope::SafekeeperData))?;
|
||||
Ok(vec![("SAFEKEEPER_AUTH_TOKEN".to_owned(), token)])
|
||||
} else {
|
||||
Ok(Vec::new())
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Stop the server.
|
||||
///
|
||||
|
||||
@@ -786,18 +786,15 @@ async fn main() -> anyhow::Result<()> {
|
||||
anyhow::bail!("Drain requested for node which doesn't exist.")
|
||||
}
|
||||
|
||||
let can_fill = node_to_fill_descs
|
||||
.iter()
|
||||
.filter(|desc| {
|
||||
matches!(desc.availability, NodeAvailabilityWrapper::Active)
|
||||
&& matches!(
|
||||
desc.scheduling,
|
||||
NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Filling
|
||||
)
|
||||
})
|
||||
.any(|_| true);
|
||||
node_to_fill_descs.retain(|desc| {
|
||||
matches!(desc.availability, NodeAvailabilityWrapper::Active)
|
||||
&& matches!(
|
||||
desc.scheduling,
|
||||
NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Filling
|
||||
)
|
||||
});
|
||||
|
||||
if !can_fill {
|
||||
if node_to_fill_descs.is_empty() {
|
||||
anyhow::bail!("There are no nodes to drain to")
|
||||
}
|
||||
|
||||
|
||||
@@ -194,6 +194,7 @@ services:
|
||||
- compute
|
||||
|
||||
neon-test-extensions:
|
||||
profiles: ["test-extensions"]
|
||||
image: ${REPOSITORY:-neondatabase}/neon-test-extensions-v${PG_TEST_VERSION:-16}:${TAG:-latest}
|
||||
entrypoint:
|
||||
- "/bin/bash"
|
||||
|
||||
@@ -15,7 +15,6 @@ set -eux -o pipefail
|
||||
|
||||
COMPOSE_FILE='docker-compose.yml'
|
||||
cd $(dirname $0)
|
||||
docker compose -f $COMPOSE_FILE
|
||||
COMPUTE_CONTAINER_NAME=docker-compose-compute-1
|
||||
TEST_CONTAINER_NAME=docker-compose-neon-test-extensions-1
|
||||
PSQL_OPTION="-h localhost -U cloud_admin -p 55433 -d postgres"
|
||||
@@ -26,16 +25,16 @@ export http_proxy https_proxy
|
||||
cleanup() {
|
||||
echo "show container information"
|
||||
docker ps
|
||||
docker compose -f $COMPOSE_FILE logs
|
||||
docker compose --profile test-extensions -f $COMPOSE_FILE logs
|
||||
echo "stop containers..."
|
||||
docker compose -f $COMPOSE_FILE down
|
||||
docker compose --profile test-extensions -f $COMPOSE_FILE down
|
||||
}
|
||||
|
||||
for pg_version in 14 15 16; do
|
||||
echo "clean up containers if exists"
|
||||
cleanup
|
||||
PG_TEST_VERSION=$(($pg_version < 16 ? 16 : $pg_version))
|
||||
PG_VERSION=$pg_version PG_TEST_VERSION=$PG_TEST_VERSION docker compose -f $COMPOSE_FILE up --build -d
|
||||
PG_VERSION=$pg_version PG_TEST_VERSION=$PG_TEST_VERSION docker compose --profile test-extensions -f $COMPOSE_FILE up --build -d
|
||||
|
||||
echo "wait until the compute is ready. timeout after 60s. "
|
||||
cnt=0
|
||||
@@ -47,7 +46,7 @@ for pg_version in 14 15 16; do
|
||||
cleanup
|
||||
exit 1
|
||||
fi
|
||||
if docker compose -f $COMPOSE_FILE logs "compute_is_ready" | grep -q "accepting connections"; then
|
||||
if docker compose --profile test-extensions -f $COMPOSE_FILE logs "compute_is_ready" | grep -q "accepting connections"; then
|
||||
echo "OK. The compute is ready to connect."
|
||||
echo "execute simple queries."
|
||||
docker exec $COMPUTE_CONTAINER_NAME /bin/bash -c "psql $PSQL_OPTION"
|
||||
|
||||
@@ -4,18 +4,18 @@
|
||||
|
||||
Currently we build two main images:
|
||||
|
||||
- [neondatabase/neon](https://hub.docker.com/repository/docker/zenithdb/zenith) — image with pre-built `pageserver`, `safekeeper` and `proxy` binaries and all the required runtime dependencies. Built from [/Dockerfile](/Dockerfile).
|
||||
- [neondatabase/compute-node](https://hub.docker.com/repository/docker/zenithdb/compute-node) — compute node image with pre-built Postgres binaries from [neondatabase/postgres](https://github.com/neondatabase/postgres).
|
||||
- [neondatabase/neon](https://hub.docker.com/repository/docker/neondatabase/neon) — image with pre-built `pageserver`, `safekeeper` and `proxy` binaries and all the required runtime dependencies. Built from [/Dockerfile](/Dockerfile).
|
||||
- [neondatabase/compute-node-v16](https://hub.docker.com/repository/docker/neondatabase/compute-node-v16) — compute node image with pre-built Postgres binaries from [neondatabase/postgres](https://github.com/neondatabase/postgres). Similar images exist for v15 and v14.
|
||||
|
||||
And additional intermediate image:
|
||||
|
||||
- [neondatabase/compute-tools](https://hub.docker.com/repository/docker/neondatabase/compute-tools) — compute node configuration management tools.
|
||||
|
||||
## Building pipeline
|
||||
## Build pipeline
|
||||
|
||||
We build all images after a successful `release` tests run and push automatically to Docker Hub with two parallel CI jobs
|
||||
|
||||
1. `neondatabase/compute-tools` and `neondatabase/compute-node`
|
||||
1. `neondatabase/compute-tools` and `neondatabase/compute-node-v16` (and -v15 and -v14)
|
||||
|
||||
2. `neondatabase/neon`
|
||||
|
||||
@@ -34,12 +34,12 @@ You can see a [docker compose](https://docs.docker.com/compose/) example to crea
|
||||
1. create containers
|
||||
|
||||
You can specify version of neon cluster using following environment values.
|
||||
- PG_VERSION: postgres version for compute (default is 16)
|
||||
- TAG: the tag version of [docker image](https://registry.hub.docker.com/r/neondatabase/neon/tags) (default is latest), which is tagged in [CI test](/.github/workflows/build_and_test.yml)
|
||||
- PG_VERSION: postgres version for compute (default is 16 as of this writing)
|
||||
- TAG: the tag version of [docker image](https://registry.hub.docker.com/r/neondatabase/neon/tags), which is tagged in [CI test](/.github/workflows/build_and_test.yml). Default is 'latest'
|
||||
```
|
||||
$ cd docker-compose/
|
||||
$ docker-compose down # remove the containers if exists
|
||||
$ PG_VERSION=16 TAG=2937 docker-compose up --build -d # You can specify the postgres and image version
|
||||
$ PG_VERSION=16 TAG=latest docker-compose up --build -d # You can specify the postgres and image version
|
||||
Creating network "dockercompose_default" with the default driver
|
||||
Creating docker-compose_storage_broker_1 ... done
|
||||
(...omit...)
|
||||
@@ -47,29 +47,31 @@ Creating docker-compose_storage_broker_1 ... done
|
||||
|
||||
2. connect compute node
|
||||
```
|
||||
$ echo "localhost:55433:postgres:cloud_admin:cloud_admin" >> ~/.pgpass
|
||||
$ chmod 600 ~/.pgpass
|
||||
$ psql -h localhost -p 55433 -U cloud_admin
|
||||
$ psql postgresql://cloud_admin:cloud_admin@localhost:55433/postgres
|
||||
psql (16.3)
|
||||
Type "help" for help.
|
||||
|
||||
postgres=# CREATE TABLE t(key int primary key, value text);
|
||||
CREATE TABLE
|
||||
postgres=# insert into t values(1,1);
|
||||
postgres=# insert into t values(1, 1);
|
||||
INSERT 0 1
|
||||
postgres=# select * from t;
|
||||
key | value
|
||||
key | value
|
||||
-----+-------
|
||||
1 | 1
|
||||
(1 row)
|
||||
|
||||
```
|
||||
|
||||
3. If you want to see the log, you can use `docker-compose logs` command.
|
||||
```
|
||||
# check the container name you want to see
|
||||
$ docker ps
|
||||
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
|
||||
d6968a5ae912 dockercompose_compute "/shell/compute.sh" 5 minutes ago Up 5 minutes 0.0.0.0:3080->3080/tcp, 0.0.0.0:55433->55433/tcp dockercompose_compute_1
|
||||
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
|
||||
3582f6d76227 docker-compose_compute "/shell/compute.sh" 2 minutes ago Up 2 minutes 0.0.0.0:3080->3080/tcp, :::3080->3080/tcp, 0.0.0.0:55433->55433/tcp, :::55433->55433/tcp docker-compose_compute_1
|
||||
(...omit...)
|
||||
|
||||
$ docker logs -f dockercompose_compute_1
|
||||
$ docker logs -f docker-compose_compute_1
|
||||
2022-10-21 06:15:48.757 GMT [56] LOG: connection authorized: user=cloud_admin database=postgres application_name=psql
|
||||
2022-10-21 06:17:00.307 GMT [56] LOG: [NEON_SMGR] libpagestore: connected to 'host=pageserver port=6400'
|
||||
(...omit...)
|
||||
|
||||
@@ -558,6 +558,12 @@ impl KeySpaceRandomAccum {
|
||||
self.ranges.push(range);
|
||||
}
|
||||
|
||||
pub fn add_keyspace(&mut self, keyspace: KeySpace) {
|
||||
for range in keyspace.ranges {
|
||||
self.add_range(range);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_keyspace(mut self) -> KeySpace {
|
||||
let mut ranges = Vec::new();
|
||||
if !self.ranges.is_empty() {
|
||||
|
||||
@@ -7,7 +7,7 @@ license.workspace = true
|
||||
[dependencies]
|
||||
hyper.workspace = true
|
||||
opentelemetry = { workspace = true, features=["rt-tokio"] }
|
||||
opentelemetry-otlp = { workspace = true, default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-otlp = { workspace = true, default-features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-semantic-conventions.workspace = true
|
||||
reqwest = { workspace = true, default-features = false, features = ["rustls-tls"] }
|
||||
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||
|
||||
@@ -25,6 +25,8 @@ pub struct Config {
|
||||
///
|
||||
/// For simplicity, this value must be greater than or equal to `memory_history_len`.
|
||||
memory_history_log_interval: usize,
|
||||
/// The max number of iterations to skip before logging the next iteration
|
||||
memory_history_log_noskip_interval: Duration,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
@@ -33,6 +35,7 @@ impl Default for Config {
|
||||
memory_poll_interval: Duration::from_millis(100),
|
||||
memory_history_len: 5, // use 500ms of history for decision-making
|
||||
memory_history_log_interval: 20, // but only log every ~2s (otherwise it's spammy)
|
||||
memory_history_log_noskip_interval: Duration::from_secs(15), // but only if it's changed, or 60 seconds have passed
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -85,7 +88,12 @@ impl CgroupWatcher {
|
||||
|
||||
// buffer for samples that will be logged. once full, it remains so.
|
||||
let history_log_len = self.config.memory_history_log_interval;
|
||||
let max_skip = self.config.memory_history_log_noskip_interval;
|
||||
let mut history_log_buf = vec![MemoryStatus::zeroed(); history_log_len];
|
||||
let mut last_logged_memusage = MemoryStatus::zeroed();
|
||||
|
||||
// Ensure that we're tracking a value that's definitely in the past, as Instant::now is only guaranteed to be non-decreasing on Rust's T1-supported systems.
|
||||
let mut can_skip_logs_until = Instant::now() - max_skip;
|
||||
|
||||
for t in 0_u64.. {
|
||||
ticker.tick().await;
|
||||
@@ -115,12 +123,24 @@ impl CgroupWatcher {
|
||||
// equal to the logging interval, we can just log the entire buffer every time we set
|
||||
// the last entry, which also means that for this log line, we can ignore that it's a
|
||||
// ring buffer (because all the entries are in order of increasing time).
|
||||
if i == history_log_len - 1 {
|
||||
//
|
||||
// We skip logging the data if data hasn't meaningfully changed in a while, unless
|
||||
// we've already ignored previous iterations for the last max_skip period.
|
||||
if i == history_log_len - 1
|
||||
&& (now > can_skip_logs_until
|
||||
|| !history_log_buf
|
||||
.iter()
|
||||
.all(|usage| last_logged_memusage.status_is_close_or_similar(usage)))
|
||||
{
|
||||
info!(
|
||||
history = ?MemoryStatus::debug_slice(&history_log_buf),
|
||||
summary = ?summary,
|
||||
"Recent cgroup memory statistics history"
|
||||
);
|
||||
|
||||
can_skip_logs_until = now + max_skip;
|
||||
|
||||
last_logged_memusage = *history_log_buf.last().unwrap();
|
||||
}
|
||||
|
||||
updates
|
||||
@@ -232,6 +252,24 @@ impl MemoryStatus {
|
||||
|
||||
DS(slice)
|
||||
}
|
||||
|
||||
/// Check if the other memory status is a close or similar result.
|
||||
/// Returns true if the larger value is not larger than the smaller value
|
||||
/// by 1/8 of the smaller value, and within 128MiB.
|
||||
/// See tests::check_similarity_behaviour for examples of behaviour
|
||||
fn status_is_close_or_similar(&self, other: &MemoryStatus) -> bool {
|
||||
let margin;
|
||||
let diff;
|
||||
if self.non_reclaimable >= other.non_reclaimable {
|
||||
margin = other.non_reclaimable / 8;
|
||||
diff = self.non_reclaimable - other.non_reclaimable;
|
||||
} else {
|
||||
margin = self.non_reclaimable / 8;
|
||||
diff = other.non_reclaimable - self.non_reclaimable;
|
||||
}
|
||||
|
||||
diff < margin && diff < 128 * 1024 * 1024
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -261,4 +299,65 @@ mod tests {
|
||||
assert_eq!(values(2, 4), [9, 0, 1, 2]);
|
||||
assert_eq!(values(2, 10), [3, 4, 5, 6, 7, 8, 9, 0, 1, 2]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn check_similarity_behaviour() {
|
||||
// This all accesses private methods, so we can't actually run this
|
||||
// as doctests, because doctests run as an external crate.
|
||||
let mut small = super::MemoryStatus {
|
||||
non_reclaimable: 1024,
|
||||
};
|
||||
let mut large = super::MemoryStatus {
|
||||
non_reclaimable: 1024 * 1024 * 1024 * 1024,
|
||||
};
|
||||
|
||||
// objects are self-similar, no matter the size
|
||||
assert!(small.status_is_close_or_similar(&small));
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
// inequality is symmetric
|
||||
assert!(!small.status_is_close_or_similar(&large));
|
||||
assert!(!large.status_is_close_or_similar(&small));
|
||||
|
||||
small.non_reclaimable = 64;
|
||||
large.non_reclaimable = (small.non_reclaimable / 8) * 9;
|
||||
|
||||
// objects are self-similar, no matter the size
|
||||
assert!(small.status_is_close_or_similar(&small));
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
// values are similar if the larger value is larger by less than
|
||||
// 12.5%, i.e. 1/8 of the smaller value.
|
||||
// In the example above, large is exactly 12.5% larger, so this doesn't
|
||||
// match.
|
||||
assert!(!small.status_is_close_or_similar(&large));
|
||||
assert!(!large.status_is_close_or_similar(&small));
|
||||
|
||||
large.non_reclaimable -= 1;
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
assert!(small.status_is_close_or_similar(&large));
|
||||
assert!(large.status_is_close_or_similar(&small));
|
||||
|
||||
// The 1/8 rule only applies up to 128MiB of difference
|
||||
small.non_reclaimable = 1024 * 1024 * 1024 * 1024;
|
||||
large.non_reclaimable = small.non_reclaimable / 8 * 9;
|
||||
assert!(small.status_is_close_or_similar(&small));
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
assert!(!small.status_is_close_or_similar(&large));
|
||||
assert!(!large.status_is_close_or_similar(&small));
|
||||
// the large value is put just above the threshold
|
||||
large.non_reclaimable = small.non_reclaimable + 128 * 1024 * 1024;
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
assert!(!small.status_is_close_or_similar(&large));
|
||||
assert!(!large.status_is_close_or_similar(&small));
|
||||
// now below
|
||||
large.non_reclaimable -= 1;
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
assert!(small.status_is_close_or_similar(&large));
|
||||
assert!(large.status_is_close_or_similar(&small));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,11 +12,11 @@ use futures::{
|
||||
stream::{SplitSink, SplitStream},
|
||||
SinkExt, StreamExt,
|
||||
};
|
||||
use tracing::info;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use crate::protocol::{
|
||||
OutboundMsg, ProtocolRange, ProtocolResponse, ProtocolVersion, PROTOCOL_MAX_VERSION,
|
||||
PROTOCOL_MIN_VERSION,
|
||||
OutboundMsg, OutboundMsgKind, ProtocolRange, ProtocolResponse, ProtocolVersion,
|
||||
PROTOCOL_MAX_VERSION, PROTOCOL_MIN_VERSION,
|
||||
};
|
||||
|
||||
/// The central handler for all communications in the monitor.
|
||||
@@ -118,7 +118,12 @@ impl Dispatcher {
|
||||
/// serialize the wrong thing and send it, since `self.sink.send` will take
|
||||
/// any string.
|
||||
pub async fn send(&mut self, message: OutboundMsg) -> anyhow::Result<()> {
|
||||
info!(?message, "sending message");
|
||||
if matches!(&message.inner, OutboundMsgKind::HealthCheck { .. }) {
|
||||
debug!(?message, "sending message");
|
||||
} else {
|
||||
info!(?message, "sending message");
|
||||
}
|
||||
|
||||
let json = serde_json::to_string(&message).context("failed to serialize message")?;
|
||||
self.sink
|
||||
.send(Message::Text(json))
|
||||
|
||||
@@ -12,7 +12,7 @@ use axum::extract::ws::{Message, WebSocket};
|
||||
use futures::StreamExt;
|
||||
use tokio::sync::{broadcast, watch};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::cgroup::{self, CgroupWatcher};
|
||||
use crate::dispatcher::Dispatcher;
|
||||
@@ -474,26 +474,29 @@ impl Runner {
|
||||
// there is a message from the agent
|
||||
msg = self.dispatcher.source.next() => {
|
||||
if let Some(msg) = msg {
|
||||
// Don't use 'message' as a key as the string also uses
|
||||
// that for its key
|
||||
info!(?msg, "received message");
|
||||
match msg {
|
||||
match &msg {
|
||||
Ok(msg) => {
|
||||
let message: InboundMsg = match msg {
|
||||
Message::Text(text) => {
|
||||
serde_json::from_str(&text).context("failed to deserialize text message")?
|
||||
serde_json::from_str(text).context("failed to deserialize text message")?
|
||||
}
|
||||
other => {
|
||||
warn!(
|
||||
// Don't use 'message' as a key as the
|
||||
// string also uses that for its key
|
||||
msg = ?other,
|
||||
"agent should only send text messages but received different type"
|
||||
"problem processing incoming message: agent should only send text messages but received different type"
|
||||
);
|
||||
continue
|
||||
},
|
||||
};
|
||||
|
||||
if matches!(&message.inner, InboundMsgKind::HealthCheck { .. }) {
|
||||
debug!(?msg, "received message");
|
||||
} else {
|
||||
info!(?msg, "received message");
|
||||
}
|
||||
|
||||
let out = match self.process_message(message.clone()).await {
|
||||
Ok(Some(out)) => out,
|
||||
Ok(None) => continue,
|
||||
@@ -517,7 +520,11 @@ impl Runner {
|
||||
.await
|
||||
.context("failed to send message")?;
|
||||
}
|
||||
Err(e) => warn!("{e}"),
|
||||
Err(e) => warn!(
|
||||
error = format!("{e}"),
|
||||
msg = ?msg,
|
||||
"received error message"
|
||||
),
|
||||
}
|
||||
} else {
|
||||
anyhow::bail!("dispatcher connection closed")
|
||||
|
||||
@@ -2,10 +2,9 @@
|
||||
//! and push them to a HTTP endpoint.
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
|
||||
use crate::tenant::size::CalculateSyntheticSizeError;
|
||||
use crate::tenant::tasks::BackgroundLoopKind;
|
||||
use crate::tenant::{
|
||||
mgr::TenantManager, LogicalSizeCalculationCause, PageReconstructError, Tenant,
|
||||
};
|
||||
use crate::tenant::{mgr::TenantManager, LogicalSizeCalculationCause, Tenant};
|
||||
use camino::Utf8PathBuf;
|
||||
use consumption_metrics::EventType;
|
||||
use pageserver_api::models::TenantState;
|
||||
@@ -350,19 +349,12 @@ async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &Re
|
||||
// Same for the loop that fetches computed metrics.
|
||||
// By using the same limiter, we centralize metrics collection for "start" and "finished" counters,
|
||||
// which turns out is really handy to understand the system.
|
||||
let Err(e) = tenant.calculate_synthetic_size(CAUSE, cancel, ctx).await else {
|
||||
return;
|
||||
};
|
||||
|
||||
// this error can be returned if timeline is shutting down, but it does not
|
||||
// mean the synthetic size worker should terminate.
|
||||
let shutting_down = matches!(
|
||||
e.downcast_ref::<PageReconstructError>(),
|
||||
Some(PageReconstructError::Cancelled)
|
||||
);
|
||||
|
||||
if !shutting_down {
|
||||
let tenant_shard_id = tenant.tenant_shard_id();
|
||||
error!("failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}");
|
||||
match tenant.calculate_synthetic_size(CAUSE, cancel, ctx).await {
|
||||
Ok(_) => {}
|
||||
Err(CalculateSyntheticSizeError::Cancelled) => {}
|
||||
Err(e) => {
|
||||
let tenant_shard_id = tenant.tenant_shard_id();
|
||||
error!("failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1135,7 +1135,10 @@ async fn tenant_size_handler(
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
.map_err(|e| match e {
|
||||
crate::tenant::size::CalculateSyntheticSizeError::Cancelled => ApiError::ShuttingDown,
|
||||
other => ApiError::InternalServerError(anyhow::anyhow!(other)),
|
||||
})?;
|
||||
|
||||
let mut sizes = None;
|
||||
let accepts_html = headers
|
||||
@@ -1143,9 +1146,7 @@ async fn tenant_size_handler(
|
||||
.map(|v| v == "text/html")
|
||||
.unwrap_or_default();
|
||||
if !inputs_only.unwrap_or(false) {
|
||||
let storage_model = inputs
|
||||
.calculate_model()
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
let storage_model = inputs.calculate_model();
|
||||
let size = storage_model.calculate();
|
||||
|
||||
// If request header expects html, return html
|
||||
|
||||
@@ -919,6 +919,14 @@ impl Timeline {
|
||||
result.add_key(AUX_FILES_KEY);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
{
|
||||
let guard = self.extra_test_dense_keyspace.load();
|
||||
for kr in &guard.ranges {
|
||||
result.add_range(kr.clone());
|
||||
}
|
||||
}
|
||||
|
||||
Ok((
|
||||
result.to_keyspace(),
|
||||
/* AUX sparse key space */
|
||||
|
||||
@@ -509,11 +509,24 @@ pub(crate) enum GcError {
|
||||
#[error(transparent)]
|
||||
Remote(anyhow::Error),
|
||||
|
||||
// An error reading while calculating GC cutoffs
|
||||
#[error(transparent)]
|
||||
GcCutoffs(PageReconstructError),
|
||||
|
||||
// If GC was invoked for a particular timeline, this error means it didn't exist
|
||||
#[error("timeline not found")]
|
||||
TimelineNotFound,
|
||||
}
|
||||
|
||||
impl From<PageReconstructError> for GcError {
|
||||
fn from(value: PageReconstructError) -> Self {
|
||||
match value {
|
||||
PageReconstructError::Cancelled => Self::TimelineCancelled,
|
||||
other => Self::GcCutoffs(other),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Tenant {
|
||||
/// Yet another helper for timeline initialization.
|
||||
///
|
||||
@@ -1033,7 +1046,6 @@ impl Tenant {
|
||||
remote_metadata,
|
||||
TimelineResources {
|
||||
remote_client,
|
||||
deletion_queue_client: self.deletion_queue_client.clone(),
|
||||
timeline_get_throttle: self.timeline_get_throttle.clone(),
|
||||
},
|
||||
ctx,
|
||||
@@ -1059,7 +1071,6 @@ impl Tenant {
|
||||
timeline_id,
|
||||
&index_part.metadata,
|
||||
remote_timeline_client,
|
||||
self.deletion_queue_client.clone(),
|
||||
)
|
||||
.instrument(tracing::info_span!("timeline_delete", %timeline_id))
|
||||
.await
|
||||
@@ -2921,17 +2932,9 @@ impl Tenant {
|
||||
.checked_sub(horizon)
|
||||
.unwrap_or(Lsn(0));
|
||||
|
||||
let res = timeline.find_gc_cutoffs(cutoff, pitr, cancel, ctx).await;
|
||||
|
||||
match res {
|
||||
Ok(cutoffs) => {
|
||||
let old = gc_cutoffs.insert(timeline.timeline_id, cutoffs);
|
||||
assert!(old.is_none());
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(timeline_id = %timeline.timeline_id, "ignoring failure to find gc cutoffs: {e:#}");
|
||||
}
|
||||
}
|
||||
let cutoffs = timeline.find_gc_cutoffs(cutoff, pitr, cancel, ctx).await?;
|
||||
let old = gc_cutoffs.insert(timeline.timeline_id, cutoffs);
|
||||
assert!(old.is_none());
|
||||
}
|
||||
|
||||
if !self.is_active() || self.cancel.is_cancelled() {
|
||||
@@ -3443,7 +3446,6 @@ impl Tenant {
|
||||
);
|
||||
TimelineResources {
|
||||
remote_client,
|
||||
deletion_queue_client: self.deletion_queue_client.clone(),
|
||||
timeline_get_throttle: self.timeline_get_throttle.clone(),
|
||||
}
|
||||
}
|
||||
@@ -3553,7 +3555,7 @@ impl Tenant {
|
||||
cause: LogicalSizeCalculationCause,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<size::ModelInputs> {
|
||||
) -> Result<size::ModelInputs, size::CalculateSyntheticSizeError> {
|
||||
let logical_sizes_at_once = self
|
||||
.conf
|
||||
.concurrent_tenant_size_logical_size_queries
|
||||
@@ -3568,8 +3570,8 @@ impl Tenant {
|
||||
// See more for on the issue #2748 condenced out of the initial PR review.
|
||||
let mut shared_cache = tokio::select! {
|
||||
locked = self.cached_logical_sizes.lock() => locked,
|
||||
_ = cancel.cancelled() => anyhow::bail!("cancelled"),
|
||||
_ = self.cancel.cancelled() => anyhow::bail!("tenant is shutting down"),
|
||||
_ = cancel.cancelled() => return Err(size::CalculateSyntheticSizeError::Cancelled),
|
||||
_ = self.cancel.cancelled() => return Err(size::CalculateSyntheticSizeError::Cancelled),
|
||||
};
|
||||
|
||||
size::gather_inputs(
|
||||
@@ -3593,10 +3595,10 @@ impl Tenant {
|
||||
cause: LogicalSizeCalculationCause,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<u64> {
|
||||
) -> Result<u64, size::CalculateSyntheticSizeError> {
|
||||
let inputs = self.gather_size_inputs(None, cause, cancel, ctx).await?;
|
||||
|
||||
let size = inputs.calculate()?;
|
||||
let size = inputs.calculate();
|
||||
|
||||
self.set_cached_synthetic_size(size);
|
||||
|
||||
@@ -4041,6 +4043,7 @@ mod tests {
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::tenant::harness::*;
|
||||
use crate::tenant::timeline::CompactFlags;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use crate::DEFAULT_PG_VERSION;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use hex_literal::hex;
|
||||
@@ -5264,6 +5267,9 @@ mod tests {
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
|
||||
let mut test_key_end = test_key;
|
||||
test_key_end.field6 = NUM_KEYS as u32;
|
||||
tline.add_extra_test_dense_keyspace(KeySpace::single(test_key..test_key_end));
|
||||
|
||||
let mut keyspace = KeySpaceAccum::new();
|
||||
|
||||
@@ -6223,8 +6229,8 @@ mod tests {
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let mut base_key = Key::from_hex("000000000033333333444444445500000000").unwrap();
|
||||
base_key.field1 = AUX_KEY_PREFIX;
|
||||
let base_key = Key::from_hex("620000000033333333444444445500000000").unwrap();
|
||||
assert_eq!(base_key.field1, AUX_KEY_PREFIX); // in case someone accidentally changed the prefix...
|
||||
let mut test_key = base_key;
|
||||
let mut lsn = Lsn(0x10);
|
||||
|
||||
@@ -6329,6 +6335,7 @@ mod tests {
|
||||
Lsn(0x20), // it's fine to not advance LSN to 0x30 while using 0x30 to get below because `get_vectored_impl` does not wait for LSN
|
||||
)
|
||||
.await?;
|
||||
tline.add_extra_test_dense_keyspace(KeySpace::single(base_key..(base_key_nonexist.next())));
|
||||
|
||||
let child = tenant
|
||||
.branch_timeline_test_with_layers(
|
||||
@@ -6701,8 +6708,8 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_simple_bottom_most_compaction() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_simple_bottom_most_compaction")?;
|
||||
async fn test_simple_bottom_most_compaction_images() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_simple_bottom_most_compaction_images")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
fn get_key(id: u32) -> Key {
|
||||
@@ -6857,4 +6864,79 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_neon_test_record() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_neon_test_record")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
fn get_key(id: u32) -> Key {
|
||||
// using aux key here b/c they are guaranteed to be inside `collect_keyspace`.
|
||||
let mut key = Key::from_hex("620000000033333333444444445500000000").unwrap();
|
||||
key.field6 = id;
|
||||
key
|
||||
}
|
||||
|
||||
let delta1 = vec![
|
||||
(
|
||||
get_key(1),
|
||||
Lsn(0x20),
|
||||
Value::WalRecord(NeonWalRecord::wal_append(",0x20")),
|
||||
),
|
||||
(
|
||||
get_key(1),
|
||||
Lsn(0x30),
|
||||
Value::WalRecord(NeonWalRecord::wal_append(",0x30")),
|
||||
),
|
||||
(get_key(2), Lsn(0x10), Value::Image("0x10".into())),
|
||||
(
|
||||
get_key(2),
|
||||
Lsn(0x20),
|
||||
Value::WalRecord(NeonWalRecord::wal_append(",0x20")),
|
||||
),
|
||||
(
|
||||
get_key(2),
|
||||
Lsn(0x30),
|
||||
Value::WalRecord(NeonWalRecord::wal_append(",0x30")),
|
||||
),
|
||||
(get_key(3), Lsn(0x10), Value::Image("0x10".into())),
|
||||
(
|
||||
get_key(3),
|
||||
Lsn(0x20),
|
||||
Value::WalRecord(NeonWalRecord::wal_clear()),
|
||||
),
|
||||
(get_key(4), Lsn(0x10), Value::Image("0x10".into())),
|
||||
(
|
||||
get_key(4),
|
||||
Lsn(0x20),
|
||||
Value::WalRecord(NeonWalRecord::wal_init()),
|
||||
),
|
||||
];
|
||||
let image1 = vec![(get_key(1), "0x10".into())];
|
||||
|
||||
let tline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
TIMELINE_ID,
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
vec![delta1], // delta layers
|
||||
vec![(Lsn(0x10), image1)], // image layers
|
||||
Lsn(0x50),
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert_eq!(
|
||||
tline.get(get_key(1), Lsn(0x50), &ctx).await?,
|
||||
Bytes::from_static(b"0x10,0x20,0x30")
|
||||
);
|
||||
assert_eq!(
|
||||
tline.get(get_key(2), Lsn(0x50), &ctx).await?,
|
||||
Bytes::from_static(b"0x10,0x20,0x30")
|
||||
);
|
||||
// assert_eq!(tline.get(get_key(3), Lsn(0x50), &ctx).await?, Bytes::new());
|
||||
// assert_eq!(tline.get(get_key(4), Lsn(0x50), &ctx).await?, Bytes::new());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -513,7 +513,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
// cover our access to local storage.
|
||||
let Ok(_guard) = self.secondary_state.gate.enter() else {
|
||||
// Shutting down
|
||||
return Ok(());
|
||||
return Err(UpdateError::Cancelled);
|
||||
};
|
||||
|
||||
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
|
||||
@@ -846,7 +846,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
for layer in timeline.layers {
|
||||
if self.secondary_state.cancel.is_cancelled() {
|
||||
tracing::debug!("Cancelled -- dropping out of layer loop");
|
||||
return Ok(());
|
||||
return Err(UpdateError::Cancelled);
|
||||
}
|
||||
|
||||
// Existing on-disk layers: just update their access time.
|
||||
|
||||
@@ -3,7 +3,6 @@ use std::collections::hash_map::Entry;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use tokio::sync::oneshot::error::RecvError;
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -11,7 +10,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use crate::context::RequestContext;
|
||||
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
|
||||
|
||||
use super::{LogicalSizeCalculationCause, Tenant};
|
||||
use super::{GcError, LogicalSizeCalculationCause, Tenant};
|
||||
use crate::tenant::Timeline;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -43,6 +42,40 @@ pub struct SegmentMeta {
|
||||
pub kind: LsnKind,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum CalculateSyntheticSizeError {
|
||||
/// Something went wrong internally to the calculation of logical size at a particular branch point
|
||||
#[error("Failed to calculated logical size on timeline {timeline_id} at {lsn}: {error}")]
|
||||
LogicalSize {
|
||||
timeline_id: TimelineId,
|
||||
lsn: Lsn,
|
||||
error: CalculateLogicalSizeError,
|
||||
},
|
||||
|
||||
/// Something went wrong internally when calculating GC parameters at start of size calculation
|
||||
#[error(transparent)]
|
||||
GcInfo(GcError),
|
||||
|
||||
/// Totally unexpected errors, like panics joining a task
|
||||
#[error(transparent)]
|
||||
Fatal(anyhow::Error),
|
||||
|
||||
/// Tenant shut down while calculating size
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
impl From<GcError> for CalculateSyntheticSizeError {
|
||||
fn from(value: GcError) -> Self {
|
||||
match value {
|
||||
GcError::TenantCancelled | GcError::TimelineCancelled => {
|
||||
CalculateSyntheticSizeError::Cancelled
|
||||
}
|
||||
other => CalculateSyntheticSizeError::GcInfo(other),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentMeta {
|
||||
fn size_needed(&self) -> bool {
|
||||
match self.kind {
|
||||
@@ -116,12 +149,9 @@ pub(super) async fn gather_inputs(
|
||||
cause: LogicalSizeCalculationCause,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ModelInputs> {
|
||||
) -> Result<ModelInputs, CalculateSyntheticSizeError> {
|
||||
// refresh is needed to update gc related pitr_cutoff and horizon_cutoff
|
||||
tenant
|
||||
.refresh_gc_info(cancel, ctx)
|
||||
.await
|
||||
.context("Failed to refresh gc_info before gathering inputs")?;
|
||||
tenant.refresh_gc_info(cancel, ctx).await?;
|
||||
|
||||
// Collect information about all the timelines
|
||||
let mut timelines = tenant.list_timelines();
|
||||
@@ -327,6 +357,12 @@ pub(super) async fn gather_inputs(
|
||||
)
|
||||
.await?;
|
||||
|
||||
if tenant.cancel.is_cancelled() {
|
||||
// If we're shutting down, return an error rather than a sparse result that might include some
|
||||
// timelines from before we started shutting down
|
||||
return Err(CalculateSyntheticSizeError::Cancelled);
|
||||
}
|
||||
|
||||
Ok(ModelInputs {
|
||||
segments,
|
||||
timeline_inputs,
|
||||
@@ -335,9 +371,8 @@ pub(super) async fn gather_inputs(
|
||||
|
||||
/// Augment 'segments' with logical sizes
|
||||
///
|
||||
/// this will probably conflict with on-demand downloaded layers, or at least force them all
|
||||
/// to be downloaded
|
||||
///
|
||||
/// This will leave segments' sizes as None if the Timeline associated with the segment is deleted concurrently
|
||||
/// (i.e. we cannot read its logical size at a particular LSN).
|
||||
async fn fill_logical_sizes(
|
||||
timelines: &[Arc<Timeline>],
|
||||
segments: &mut [SegmentMeta],
|
||||
@@ -345,7 +380,7 @@ async fn fill_logical_sizes(
|
||||
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), CalculateSyntheticSizeError> {
|
||||
let timeline_hash: HashMap<TimelineId, Arc<Timeline>> = HashMap::from_iter(
|
||||
timelines
|
||||
.iter()
|
||||
@@ -387,7 +422,7 @@ async fn fill_logical_sizes(
|
||||
}
|
||||
|
||||
// Perform the size lookups
|
||||
let mut have_any_error = false;
|
||||
let mut have_any_error = None;
|
||||
while let Some(res) = joinset.join_next().await {
|
||||
// each of these come with Result<anyhow::Result<_>, JoinError>
|
||||
// because of spawn + spawn_blocking
|
||||
@@ -398,21 +433,36 @@ async fn fill_logical_sizes(
|
||||
Err(join_error) => {
|
||||
// cannot really do anything, as this panic is likely a bug
|
||||
error!("task that calls spawn_ondemand_logical_size_calculation panicked: {join_error:#}");
|
||||
have_any_error = true;
|
||||
|
||||
have_any_error = Some(CalculateSyntheticSizeError::Fatal(
|
||||
anyhow::anyhow!(join_error)
|
||||
.context("task that calls spawn_ondemand_logical_size_calculation"),
|
||||
));
|
||||
}
|
||||
Ok(Err(recv_result_error)) => {
|
||||
// cannot really do anything, as this panic is likely a bug
|
||||
error!("failed to receive logical size query result: {recv_result_error:#}");
|
||||
have_any_error = true;
|
||||
have_any_error = Some(CalculateSyntheticSizeError::Fatal(
|
||||
anyhow::anyhow!(recv_result_error)
|
||||
.context("Receiving logical size query result"),
|
||||
));
|
||||
}
|
||||
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => {
|
||||
if !matches!(error, CalculateLogicalSizeError::Cancelled) {
|
||||
if matches!(error, CalculateLogicalSizeError::Cancelled) {
|
||||
// Skip this: it's okay if one timeline among many is shutting down while we
|
||||
// calculate inputs for the overall tenant.
|
||||
continue;
|
||||
} else {
|
||||
warn!(
|
||||
timeline_id=%timeline.timeline_id,
|
||||
"failed to calculate logical size at {lsn}: {error:#}"
|
||||
);
|
||||
have_any_error = Some(CalculateSyntheticSizeError::LogicalSize {
|
||||
timeline_id: timeline.timeline_id,
|
||||
lsn,
|
||||
error,
|
||||
});
|
||||
}
|
||||
have_any_error = true;
|
||||
}
|
||||
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => {
|
||||
debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated");
|
||||
@@ -426,10 +476,10 @@ async fn fill_logical_sizes(
|
||||
// prune any keys not needed anymore; we record every used key and added key.
|
||||
logical_size_cache.retain(|key, _| sizes_needed.contains_key(key));
|
||||
|
||||
if have_any_error {
|
||||
if let Some(error) = have_any_error {
|
||||
// we cannot complete this round, because we are missing data.
|
||||
// we have however cached all we were able to request calculation on.
|
||||
anyhow::bail!("failed to calculate some logical_sizes");
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
// Insert the looked up sizes to the Segments
|
||||
@@ -443,33 +493,28 @@ async fn fill_logical_sizes(
|
||||
|
||||
if let Some(Some(size)) = sizes_needed.get(&(timeline_id, lsn)) {
|
||||
seg.segment.size = Some(*size);
|
||||
} else {
|
||||
bail!("could not find size at {} in timeline {}", lsn, timeline_id);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl ModelInputs {
|
||||
pub fn calculate_model(&self) -> anyhow::Result<tenant_size_model::StorageModel> {
|
||||
pub fn calculate_model(&self) -> tenant_size_model::StorageModel {
|
||||
// Convert SegmentMetas into plain Segments
|
||||
let storage = StorageModel {
|
||||
StorageModel {
|
||||
segments: self
|
||||
.segments
|
||||
.iter()
|
||||
.map(|seg| seg.segment.clone())
|
||||
.collect(),
|
||||
};
|
||||
|
||||
Ok(storage)
|
||||
}
|
||||
}
|
||||
|
||||
// calculate total project size
|
||||
pub fn calculate(&self) -> anyhow::Result<u64> {
|
||||
let storage = self.calculate_model()?;
|
||||
pub fn calculate(&self) -> u64 {
|
||||
let storage = self.calculate_model();
|
||||
let sizes = storage.calculate();
|
||||
|
||||
Ok(sizes.total_size)
|
||||
sizes.total_size
|
||||
}
|
||||
}
|
||||
|
||||
@@ -656,7 +701,7 @@ fn verify_size_for_multiple_branches() {
|
||||
"#;
|
||||
let inputs: ModelInputs = serde_json::from_str(doc).unwrap();
|
||||
|
||||
assert_eq!(inputs.calculate().unwrap(), 37_851_408);
|
||||
assert_eq!(inputs.calculate(), 37_851_408);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -711,7 +756,7 @@ fn verify_size_for_one_branch() {
|
||||
|
||||
let model: ModelInputs = serde_json::from_str(doc).unwrap();
|
||||
|
||||
let res = model.calculate_model().unwrap().calculate();
|
||||
let res = model.calculate_model().calculate();
|
||||
|
||||
println!("calculated synthetic size: {}", res.total_size);
|
||||
println!("result: {:?}", serde_json::to_string(&res.segments));
|
||||
|
||||
@@ -318,7 +318,7 @@ pub(crate) struct LayerFringe {
|
||||
#[derive(Debug)]
|
||||
struct LayerKeyspace {
|
||||
layer: ReadableLayer,
|
||||
target_keyspace: Vec<KeySpace>,
|
||||
target_keyspace: KeySpaceRandomAccum,
|
||||
}
|
||||
|
||||
impl LayerFringe {
|
||||
@@ -342,17 +342,13 @@ impl LayerFringe {
|
||||
_,
|
||||
LayerKeyspace {
|
||||
layer,
|
||||
target_keyspace,
|
||||
mut target_keyspace,
|
||||
},
|
||||
)) => {
|
||||
let mut keyspace = KeySpaceRandomAccum::new();
|
||||
for ks in target_keyspace {
|
||||
for part in ks.ranges {
|
||||
keyspace.add_range(part);
|
||||
}
|
||||
}
|
||||
Some((layer, keyspace.consume_keyspace(), read_desc.lsn_range))
|
||||
}
|
||||
)) => Some((
|
||||
layer,
|
||||
target_keyspace.consume_keyspace(),
|
||||
read_desc.lsn_range,
|
||||
)),
|
||||
None => unreachable!("fringe internals are always consistent"),
|
||||
}
|
||||
}
|
||||
@@ -367,16 +363,18 @@ impl LayerFringe {
|
||||
let entry = self.layers.entry(layer_id.clone());
|
||||
match entry {
|
||||
Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().target_keyspace.push(keyspace);
|
||||
entry.get_mut().target_keyspace.add_keyspace(keyspace);
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
self.planned_reads_by_lsn.push(ReadDesc {
|
||||
lsn_range,
|
||||
layer_id: layer_id.clone(),
|
||||
});
|
||||
let mut accum = KeySpaceRandomAccum::new();
|
||||
accum.add_keyspace(keyspace);
|
||||
entry.insert(LayerKeyspace {
|
||||
layer,
|
||||
target_keyspace: vec![keyspace],
|
||||
target_keyspace: accum,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -219,7 +219,6 @@ pub struct DeltaLayerInner {
|
||||
// values copied from summary
|
||||
index_start_blk: u32,
|
||||
index_root_blk: u32,
|
||||
lsn_range: Range<Lsn>,
|
||||
|
||||
file: VirtualFile,
|
||||
file_id: FileId,
|
||||
@@ -785,7 +784,6 @@ impl DeltaLayerInner {
|
||||
file_id,
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
lsn_range: actual_summary.lsn_range,
|
||||
max_vectored_read_bytes,
|
||||
}))
|
||||
}
|
||||
@@ -911,7 +909,7 @@ impl DeltaLayerInner {
|
||||
|
||||
let reads = Self::plan_reads(
|
||||
&keyspace,
|
||||
lsn_range,
|
||||
lsn_range.clone(),
|
||||
data_end_offset,
|
||||
index_reader,
|
||||
planner,
|
||||
@@ -924,7 +922,7 @@ impl DeltaLayerInner {
|
||||
self.do_reads_and_update_state(reads, reconstruct_state, ctx)
|
||||
.await;
|
||||
|
||||
reconstruct_state.on_lsn_advanced(&keyspace, self.lsn_range.start);
|
||||
reconstruct_state.on_lsn_advanced(&keyspace, lsn_range.start);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -62,6 +62,7 @@ use std::{
|
||||
ops::ControlFlow,
|
||||
};
|
||||
|
||||
use crate::metrics::GetKind;
|
||||
use crate::pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS;
|
||||
use crate::{
|
||||
aux_file::AuxFileSizeEstimator,
|
||||
@@ -75,7 +76,6 @@ use crate::{
|
||||
disk_usage_eviction_task::DiskUsageEvictionInfo,
|
||||
pgdatadir_mapping::CollectKeySpaceError,
|
||||
};
|
||||
use crate::{deletion_queue::DeletionQueueClient, metrics::GetKind};
|
||||
use crate::{
|
||||
disk_usage_eviction_task::finite_f32,
|
||||
tenant::storage_layer::{
|
||||
@@ -205,7 +205,6 @@ fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
|
||||
/// The outward-facing resources required to build a Timeline
|
||||
pub struct TimelineResources {
|
||||
pub remote_client: RemoteTimelineClient,
|
||||
pub deletion_queue_client: DeletionQueueClient,
|
||||
pub timeline_get_throttle: Arc<
|
||||
crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
|
||||
>,
|
||||
@@ -426,6 +425,14 @@ pub struct Timeline {
|
||||
|
||||
/// Indicate whether aux file v2 storage is enabled.
|
||||
pub(crate) last_aux_file_policy: AtomicAuxFilePolicy,
|
||||
|
||||
/// Some test cases directly place keys into the timeline without actually modifying the directory
|
||||
/// keys (i.e., DB_DIR). The test cases creating such keys will put the keyspaces here, so that
|
||||
/// these keys won't get garbage-collected during compaction/GC. This field only modifies the dense
|
||||
/// keyspace return value of `collect_keyspace`. For sparse keyspaces, use AUX keys for testing, and
|
||||
/// in the future, add `extra_test_sparse_keyspace` if necessary.
|
||||
#[cfg(test)]
|
||||
pub(crate) extra_test_dense_keyspace: ArcSwap<KeySpace>,
|
||||
}
|
||||
|
||||
pub struct WalReceiverInfo {
|
||||
@@ -2344,6 +2351,9 @@ impl Timeline {
|
||||
aux_file_size_estimator: AuxFileSizeEstimator::new(aux_file_metrics),
|
||||
|
||||
last_aux_file_policy: AtomicAuxFilePolicy::new(aux_file_policy),
|
||||
|
||||
#[cfg(test)]
|
||||
extra_test_dense_keyspace: ArcSwap::new(Arc::new(KeySpace::default())),
|
||||
};
|
||||
result.repartition_threshold =
|
||||
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
|
||||
@@ -4812,7 +4822,7 @@ impl Timeline {
|
||||
pitr: Duration,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<GcCutoffs> {
|
||||
) -> Result<GcCutoffs, PageReconstructError> {
|
||||
let _timer = self
|
||||
.metrics
|
||||
.find_gc_cutoffs_histo
|
||||
@@ -5562,6 +5572,13 @@ impl Timeline {
|
||||
}
|
||||
Ok(layers)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn add_extra_test_dense_keyspace(&self, ks: KeySpace) {
|
||||
let mut keyspace = self.extra_test_dense_keyspace.load().as_ref().clone();
|
||||
keyspace.merge(&ks);
|
||||
self.extra_test_dense_keyspace.store(Arc::new(keyspace));
|
||||
}
|
||||
}
|
||||
|
||||
type TraversalPathItem = (ValueReconstructResult, Lsn, TraversalId);
|
||||
|
||||
@@ -11,7 +11,6 @@ use utils::{crashsafe, fs_ext, id::TimelineId, pausable_failpoint};
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
deletion_queue::DeletionQueueClient,
|
||||
task_mgr::{self, TaskKind},
|
||||
tenant::{
|
||||
metadata::TimelineMetadata,
|
||||
@@ -263,7 +262,6 @@ impl DeleteTimelineFlow {
|
||||
timeline_id: TimelineId,
|
||||
local_metadata: &TimelineMetadata,
|
||||
remote_client: RemoteTimelineClient,
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
) -> anyhow::Result<()> {
|
||||
// Note: here we even skip populating layer map. Timeline is essentially uninitialized.
|
||||
// RemoteTimelineClient is the only functioning part.
|
||||
@@ -274,7 +272,6 @@ impl DeleteTimelineFlow {
|
||||
None, // Ancestor is not needed for deletion.
|
||||
TimelineResources {
|
||||
remote_client,
|
||||
deletion_queue_client,
|
||||
timeline_get_throttle: tenant.timeline_get_throttle.clone(),
|
||||
},
|
||||
// Important. We dont pass ancestor above because it can be missing.
|
||||
|
||||
@@ -49,6 +49,19 @@ pub enum NeonWalRecord {
|
||||
file_path: String,
|
||||
content: Option<Bytes>,
|
||||
},
|
||||
|
||||
/// A testing record for unit testing purposes. It supports append data to an existing image, or clear it.
|
||||
#[cfg(test)]
|
||||
Test {
|
||||
/// Append a string to the image.
|
||||
append: String,
|
||||
/// Clear the image before appending.
|
||||
clear: bool,
|
||||
/// Treat this record as an init record. `clear` should be set to true if this field is set
|
||||
/// to true. This record does not need the history WALs to reconstruct. See [`NeonWalRecord::will_init`] and
|
||||
/// its references in `timeline.rs`.
|
||||
will_init: bool,
|
||||
},
|
||||
}
|
||||
|
||||
impl NeonWalRecord {
|
||||
@@ -58,11 +71,39 @@ impl NeonWalRecord {
|
||||
// If you change this function, you'll also need to change ValueBytes::will_init
|
||||
match self {
|
||||
NeonWalRecord::Postgres { will_init, rec: _ } => *will_init,
|
||||
|
||||
#[cfg(test)]
|
||||
NeonWalRecord::Test { will_init, .. } => *will_init,
|
||||
// None of the special neon record types currently initialize the page
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn wal_append(s: impl AsRef<str>) -> Self {
|
||||
Self::Test {
|
||||
append: s.as_ref().to_string(),
|
||||
clear: false,
|
||||
will_init: false,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn wal_clear() -> Self {
|
||||
Self::Test {
|
||||
append: "".to_string(),
|
||||
clear: true,
|
||||
will_init: false,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn wal_init() -> Self {
|
||||
Self::Test {
|
||||
append: "".to_string(),
|
||||
clear: true,
|
||||
will_init: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// DecodedBkpBlock represents per-page data contained in a WAL record.
|
||||
|
||||
@@ -244,6 +244,20 @@ pub(crate) fn apply_in_neon(
|
||||
let mut writer = page.writer();
|
||||
dir.ser_into(&mut writer)?;
|
||||
}
|
||||
#[cfg(test)]
|
||||
NeonWalRecord::Test {
|
||||
append,
|
||||
clear,
|
||||
will_init,
|
||||
} => {
|
||||
if *will_init {
|
||||
assert!(*clear, "init record must be clear to ensure correctness");
|
||||
}
|
||||
if *clear {
|
||||
page.clear();
|
||||
}
|
||||
page.put_slice(append.as_bytes());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,19 +1,8 @@
|
||||
From 0b0194a57bd0f3598bd57dbedd0df3932330169d Mon Sep 17 00:00:00 2001
|
||||
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
|
||||
Date: Fri, 2 Feb 2024 22:26:45 +0200
|
||||
Subject: [PATCH 1/1] Make v0.6.0 work with Neon
|
||||
|
||||
Now that the WAL-logging happens as a separate step at the end of the
|
||||
build, we need a few neon-specific hints to make it work.
|
||||
---
|
||||
src/hnswbuild.c | 36 ++++++++++++++++++++++++++++++++++++
|
||||
1 file changed, 36 insertions(+)
|
||||
|
||||
diff --git a/src/hnswbuild.c b/src/hnswbuild.c
|
||||
index 680789b..ec54dea 100644
|
||||
index dcfb2bd..d5189ee 100644
|
||||
--- a/src/hnswbuild.c
|
||||
+++ b/src/hnswbuild.c
|
||||
@@ -840,9 +840,17 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc)
|
||||
@@ -860,9 +860,17 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc)
|
||||
|
||||
hnswarea = shm_toc_lookup(toc, PARALLEL_KEY_HNSW_AREA, false);
|
||||
|
||||
@@ -31,7 +20,7 @@ index 680789b..ec54dea 100644
|
||||
/* Close relations within worker */
|
||||
index_close(indexRel, indexLockmode);
|
||||
table_close(heapRel, heapLockmode);
|
||||
@@ -1089,13 +1097,41 @@ BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo,
|
||||
@@ -1117,12 +1125,38 @@ BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo,
|
||||
SeedRandom(42);
|
||||
#endif
|
||||
|
||||
@@ -43,14 +32,13 @@ index 680789b..ec54dea 100644
|
||||
|
||||
BuildGraph(buildstate, forkNum);
|
||||
|
||||
- if (RelationNeedsWAL(index) || forkNum == INIT_FORKNUM)
|
||||
+#ifdef NEON_SMGR
|
||||
+ smgr_finish_unlogged_build_phase_1(RelationGetSmgr(index));
|
||||
+#endif
|
||||
+
|
||||
if (RelationNeedsWAL(index))
|
||||
+ {
|
||||
log_newpage_range(index, forkNum, 0, RelationGetNumberOfBlocks(index), true);
|
||||
|
||||
+ if (RelationNeedsWAL(index) || forkNum == INIT_FORKNUM) {
|
||||
log_newpage_range(index, forkNum, 0, RelationGetNumberOfBlocksInFork(index, forkNum), true);
|
||||
+#ifdef NEON_SMGR
|
||||
+ {
|
||||
+#if PG_VERSION_NUM >= 160000
|
||||
@@ -60,7 +48,7 @@ index 680789b..ec54dea 100644
|
||||
+#endif
|
||||
+
|
||||
+ SetLastWrittenLSNForBlockRange(XactLastRecEnd, rlocator,
|
||||
+ MAIN_FORKNUM, 0, RelationGetNumberOfBlocks(index));
|
||||
+ MAIN_FORKNUM, 0, RelationGetNumberOfBlocks(index));
|
||||
+ SetLastWrittenLSNForRelation(XactLastRecEnd, rlocator, MAIN_FORKNUM);
|
||||
+ }
|
||||
+#endif
|
||||
@@ -69,10 +57,6 @@ index 680789b..ec54dea 100644
|
||||
+#ifdef NEON_SMGR
|
||||
+ smgr_end_unlogged_build(RelationGetSmgr(index));
|
||||
+#endif
|
||||
+
|
||||
|
||||
FreeBuildState(buildstate);
|
||||
}
|
||||
|
||||
--
|
||||
2.39.2
|
||||
|
||||
|
||||
@@ -3112,12 +3112,12 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
|
||||
request_lsn = UINT64_MAX;
|
||||
|
||||
/*
|
||||
* GetRedoStartLsn() returns LSN of basebackup. We know that the SLRU
|
||||
* GetRedoStartLsn() returns LSN of the basebackup. We know that the SLRU
|
||||
* segment has not changed since the basebackup, because in order to
|
||||
* modify it, we would have had to download it already. And once
|
||||
* downloaded, we never evict SLRU segments from local disk.
|
||||
*/
|
||||
not_modified_since = GetRedoStartLsn();
|
||||
not_modified_since = nm_adjust_lsn(GetRedoStartLsn());
|
||||
|
||||
SlruKind kind;
|
||||
|
||||
|
||||
@@ -286,7 +286,6 @@ WalProposerPoll(WalProposer *wp)
|
||||
void
|
||||
WalProposerStart(WalProposer *wp)
|
||||
{
|
||||
|
||||
/* Initiate connections to all safekeeper nodes */
|
||||
for (int i = 0; i < wp->n_safekeepers; i++)
|
||||
{
|
||||
|
||||
@@ -63,6 +63,8 @@ char *wal_acceptors_list = "";
|
||||
int wal_acceptor_reconnect_timeout = 1000;
|
||||
int wal_acceptor_connection_timeout = 10000;
|
||||
|
||||
/* Set to true in the walproposer bgw. */
|
||||
static bool am_walproposer;
|
||||
static WalproposerShmemState *walprop_shared;
|
||||
static WalProposerConfig walprop_config;
|
||||
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
|
||||
@@ -76,6 +78,7 @@ static HotStandbyFeedback agg_hs_feedback;
|
||||
|
||||
static void nwp_shmem_startup_hook(void);
|
||||
static void nwp_register_gucs(void);
|
||||
static void assign_neon_safekeepers(const char *newval, void *extra);
|
||||
static void nwp_prepare_shmem(void);
|
||||
static uint64 backpressure_lag_impl(void);
|
||||
static bool backpressure_throttling_impl(void);
|
||||
@@ -116,7 +119,8 @@ init_walprop_config(bool syncSafekeepers)
|
||||
{
|
||||
walprop_config.neon_tenant = neon_tenant;
|
||||
walprop_config.neon_timeline = neon_timeline;
|
||||
walprop_config.safekeepers_list = wal_acceptors_list;
|
||||
/* WalProposerCreate scribbles directly on it, so pstrdup */
|
||||
walprop_config.safekeepers_list = pstrdup(wal_acceptors_list);
|
||||
walprop_config.safekeeper_reconnect_timeout = wal_acceptor_reconnect_timeout;
|
||||
walprop_config.safekeeper_connection_timeout = wal_acceptor_connection_timeout;
|
||||
walprop_config.wal_segment_size = wal_segment_size;
|
||||
@@ -156,6 +160,7 @@ WalProposerMain(Datum main_arg)
|
||||
|
||||
init_walprop_config(false);
|
||||
walprop_pg_init_bgworker();
|
||||
am_walproposer = true;
|
||||
walprop_pg_load_libpqwalreceiver();
|
||||
|
||||
wp = WalProposerCreate(&walprop_config, walprop_pg);
|
||||
@@ -194,10 +199,10 @@ nwp_register_gucs(void)
|
||||
NULL, /* long_desc */
|
||||
&wal_acceptors_list, /* valueAddr */
|
||||
"", /* bootValue */
|
||||
PGC_POSTMASTER,
|
||||
PGC_SIGHUP,
|
||||
GUC_LIST_INPUT, /* extensions can't use*
|
||||
* GUC_LIST_QUOTE */
|
||||
NULL, NULL, NULL);
|
||||
NULL, assign_neon_safekeepers, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"neon.safekeeper_reconnect_timeout",
|
||||
@@ -220,6 +225,33 @@ nwp_register_gucs(void)
|
||||
NULL, NULL, NULL);
|
||||
}
|
||||
|
||||
/*
|
||||
* GUC assign_hook for neon.safekeepers. Restarts walproposer through FATAL if
|
||||
* the list changed.
|
||||
*/
|
||||
static void
|
||||
assign_neon_safekeepers(const char *newval, void *extra)
|
||||
{
|
||||
if (!am_walproposer)
|
||||
return;
|
||||
|
||||
if (!newval) {
|
||||
/* should never happen */
|
||||
wpg_log(FATAL, "neon.safekeepers is empty");
|
||||
}
|
||||
|
||||
/*
|
||||
* TODO: restarting through FATAL is stupid and introduces 1s delay before
|
||||
* next bgw start. We should refactor walproposer to allow graceful exit and
|
||||
* thus remove this delay.
|
||||
*/
|
||||
if (strcmp(wal_acceptors_list, newval) != 0)
|
||||
{
|
||||
wpg_log(FATAL, "restarting walproposer to change safekeeper list from %s to %s",
|
||||
wal_acceptors_list, newval);
|
||||
}
|
||||
}
|
||||
|
||||
/* Check if we need to suspend inserts because of lagging replication. */
|
||||
static uint64
|
||||
backpressure_lag_impl(void)
|
||||
@@ -368,7 +400,7 @@ walprop_register_bgworker(void)
|
||||
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "WalProposerMain");
|
||||
snprintf(bgw.bgw_name, BGW_MAXLEN, "WAL proposer");
|
||||
snprintf(bgw.bgw_type, BGW_MAXLEN, "WAL proposer");
|
||||
bgw.bgw_restart_time = 5;
|
||||
bgw.bgw_restart_time = 1;
|
||||
bgw.bgw_notify_pid = 0;
|
||||
bgw.bgw_main_arg = (Datum) 0;
|
||||
|
||||
@@ -1238,13 +1270,8 @@ WalSndLoop(WalProposer *wp)
|
||||
{
|
||||
XLogRecPtr flushPtr;
|
||||
|
||||
/* Clear any already-pending wakeups */
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
for (;;)
|
||||
{
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
XLogBroadcastWalProposer(wp);
|
||||
WalProposerPoll(wp);
|
||||
}
|
||||
@@ -1775,6 +1802,20 @@ walprop_pg_wait_event_set(WalProposer *wp, long timeout, Safekeeper **sk, uint32
|
||||
late_cv_trigger = ConditionVariableCancelSleep();
|
||||
#endif
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/*
|
||||
* Process config if requested. This restarts walproposer if safekeepers
|
||||
* list changed. Don't do that for sync-safekeepers because quite probably
|
||||
* it (re-reading config) won't work without some effort, and
|
||||
* sync-safekeepers should be quick to finish anyway.
|
||||
*/
|
||||
if (!wp->config->syncSafekeepers && ConfigReloadPending)
|
||||
{
|
||||
ConfigReloadPending = false;
|
||||
ProcessConfigFile(PGC_SIGHUP);
|
||||
}
|
||||
|
||||
/*
|
||||
* If wait is terminated by latch set (walsenders' latch is set on each
|
||||
* wal flush). (no need for pm death check due to WL_EXIT_ON_PM_DEATH)
|
||||
|
||||
@@ -1,16 +1,183 @@
|
||||
use measured::FixedCardinalityLabel;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt;
|
||||
use std::fmt::{self, Display};
|
||||
|
||||
use crate::auth::IpPattern;
|
||||
|
||||
use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt};
|
||||
use crate::proxy::retry::ShouldRetry;
|
||||
|
||||
/// Generic error response with human-readable description.
|
||||
/// Note that we can't always present it to user as is.
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct ConsoleError {
|
||||
pub error: Box<str>,
|
||||
#[serde(skip)]
|
||||
pub http_status_code: http::StatusCode,
|
||||
pub status: Option<Status>,
|
||||
}
|
||||
|
||||
impl ConsoleError {
|
||||
pub fn get_reason(&self) -> Reason {
|
||||
self.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.details.error_info.as_ref())
|
||||
.map(|e| e.reason)
|
||||
.unwrap_or(Reason::Unknown)
|
||||
}
|
||||
pub fn get_user_facing_message(&self) -> String {
|
||||
use super::provider::errors::REQUEST_FAILED;
|
||||
self.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.details.user_facing_message.as_ref())
|
||||
.map(|m| m.message.clone().into())
|
||||
.unwrap_or_else(|| {
|
||||
// Ask @neondatabase/control-plane for review before adding more.
|
||||
match self.http_status_code {
|
||||
http::StatusCode::NOT_FOUND => {
|
||||
// Status 404: failed to get a project-related resource.
|
||||
format!("{REQUEST_FAILED}: endpoint cannot be found")
|
||||
}
|
||||
http::StatusCode::NOT_ACCEPTABLE => {
|
||||
// Status 406: endpoint is disabled (we don't allow connections).
|
||||
format!("{REQUEST_FAILED}: endpoint is disabled")
|
||||
}
|
||||
http::StatusCode::LOCKED | http::StatusCode::UNPROCESSABLE_ENTITY => {
|
||||
// Status 423: project might be in maintenance mode (or bad state), or quotas exceeded.
|
||||
format!("{REQUEST_FAILED}: endpoint is temporarily unavailable. Check your quotas and/or contact our support.")
|
||||
}
|
||||
_ => REQUEST_FAILED.to_owned(),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for ConsoleError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let msg = self
|
||||
.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.details.user_facing_message.as_ref())
|
||||
.map(|m| m.message.as_ref())
|
||||
.unwrap_or_else(|| &self.error);
|
||||
write!(f, "{}", msg)
|
||||
}
|
||||
}
|
||||
|
||||
impl ShouldRetry for ConsoleError {
|
||||
fn could_retry(&self) -> bool {
|
||||
if self.status.is_none() || self.status.as_ref().unwrap().details.retry_info.is_none() {
|
||||
// retry some temporary failures because the compute was in a bad state
|
||||
// (bad request can be returned when the endpoint was in transition)
|
||||
return match &self {
|
||||
ConsoleError {
|
||||
http_status_code: http::StatusCode::BAD_REQUEST,
|
||||
..
|
||||
} => true,
|
||||
// don't retry when quotas are exceeded
|
||||
ConsoleError {
|
||||
http_status_code: http::StatusCode::UNPROCESSABLE_ENTITY,
|
||||
ref error,
|
||||
..
|
||||
} => !error.contains("compute time quota of non-primary branches is exceeded"),
|
||||
// locked can be returned when the endpoint was in transition
|
||||
// or when quotas are exceeded. don't retry when quotas are exceeded
|
||||
ConsoleError {
|
||||
http_status_code: http::StatusCode::LOCKED,
|
||||
ref error,
|
||||
..
|
||||
} => {
|
||||
!error.contains("quota exceeded")
|
||||
&& !error.contains("the limit for current plan reached")
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
}
|
||||
|
||||
// retry if the response has a retry delay
|
||||
if let Some(retry_info) = self
|
||||
.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.details.retry_info.as_ref())
|
||||
{
|
||||
retry_info.retry_delay_ms > 0
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct Status {
|
||||
pub code: Box<str>,
|
||||
pub message: Box<str>,
|
||||
pub details: Details,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct Details {
|
||||
pub error_info: Option<ErrorInfo>,
|
||||
pub retry_info: Option<RetryInfo>,
|
||||
pub user_facing_message: Option<UserFacingMessage>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct ErrorInfo {
|
||||
pub reason: Reason,
|
||||
// Schema could also have `metadata` field, but it's not structured. Skip it for now.
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Deserialize, Default)]
|
||||
pub enum Reason {
|
||||
#[serde(rename = "ROLE_PROTECTED")]
|
||||
RoleProtected,
|
||||
#[serde(rename = "RESOURCE_NOT_FOUND")]
|
||||
ResourceNotFound,
|
||||
#[serde(rename = "PROJECT_NOT_FOUND")]
|
||||
ProjectNotFound,
|
||||
#[serde(rename = "ENDPOINT_NOT_FOUND")]
|
||||
EndpointNotFound,
|
||||
#[serde(rename = "BRANCH_NOT_FOUND")]
|
||||
BranchNotFound,
|
||||
#[serde(rename = "RATE_LIMIT_EXCEEDED")]
|
||||
RateLimitExceeded,
|
||||
#[serde(rename = "NON_PRIMARY_BRANCH_COMPUTE_TIME_EXCEEDED")]
|
||||
NonPrimaryBranchComputeTimeExceeded,
|
||||
#[serde(rename = "ACTIVE_TIME_QUOTA_EXCEEDED")]
|
||||
ActiveTimeQuotaExceeded,
|
||||
#[serde(rename = "COMPUTE_TIME_QUOTA_EXCEEDED")]
|
||||
ComputeTimeQuotaExceeded,
|
||||
#[serde(rename = "WRITTEN_DATA_QUOTA_EXCEEDED")]
|
||||
WrittenDataQuotaExceeded,
|
||||
#[serde(rename = "DATA_TRANSFER_QUOTA_EXCEEDED")]
|
||||
DataTransferQuotaExceeded,
|
||||
#[serde(rename = "LOGICAL_SIZE_QUOTA_EXCEEDED")]
|
||||
LogicalSizeQuotaExceeded,
|
||||
#[default]
|
||||
#[serde(other)]
|
||||
Unknown,
|
||||
}
|
||||
|
||||
impl Reason {
|
||||
pub fn is_not_found(&self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
Reason::ResourceNotFound
|
||||
| Reason::ProjectNotFound
|
||||
| Reason::EndpointNotFound
|
||||
| Reason::BranchNotFound
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct RetryInfo {
|
||||
pub retry_delay_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct UserFacingMessage {
|
||||
pub message: Box<str>,
|
||||
}
|
||||
|
||||
/// Response which holds client's auth secret, e.g. [`crate::scram::ServerSecret`].
|
||||
|
||||
@@ -25,8 +25,8 @@ use tracing::info;
|
||||
|
||||
pub mod errors {
|
||||
use crate::{
|
||||
console::messages::{self, ConsoleError},
|
||||
error::{io_error, ReportableError, UserFacingError},
|
||||
http,
|
||||
proxy::retry::ShouldRetry,
|
||||
};
|
||||
use thiserror::Error;
|
||||
@@ -34,17 +34,14 @@ pub mod errors {
|
||||
use super::ApiLockError;
|
||||
|
||||
/// A go-to error message which doesn't leak any detail.
|
||||
const REQUEST_FAILED: &str = "Console request failed";
|
||||
pub const REQUEST_FAILED: &str = "Console request failed";
|
||||
|
||||
/// Common console API error.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ApiError {
|
||||
/// Error returned by the console itself.
|
||||
#[error("{REQUEST_FAILED} with {}: {}", .status, .text)]
|
||||
Console {
|
||||
status: http::StatusCode,
|
||||
text: Box<str>,
|
||||
},
|
||||
#[error("{REQUEST_FAILED} with {0}")]
|
||||
Console(ConsoleError),
|
||||
|
||||
/// Various IO errors like broken pipe or malformed payload.
|
||||
#[error("{REQUEST_FAILED}: {0}")]
|
||||
@@ -53,11 +50,11 @@ pub mod errors {
|
||||
|
||||
impl ApiError {
|
||||
/// Returns HTTP status code if it's the reason for failure.
|
||||
pub fn http_status_code(&self) -> Option<http::StatusCode> {
|
||||
pub fn get_reason(&self) -> messages::Reason {
|
||||
use ApiError::*;
|
||||
match self {
|
||||
Console { status, .. } => Some(*status),
|
||||
_ => None,
|
||||
Console(e) => e.get_reason(),
|
||||
_ => messages::Reason::Unknown,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -67,22 +64,7 @@ pub mod errors {
|
||||
use ApiError::*;
|
||||
match self {
|
||||
// To minimize risks, only select errors are forwarded to users.
|
||||
// Ask @neondatabase/control-plane for review before adding more.
|
||||
Console { status, .. } => match *status {
|
||||
http::StatusCode::NOT_FOUND => {
|
||||
// Status 404: failed to get a project-related resource.
|
||||
format!("{REQUEST_FAILED}: endpoint cannot be found")
|
||||
}
|
||||
http::StatusCode::NOT_ACCEPTABLE => {
|
||||
// Status 406: endpoint is disabled (we don't allow connections).
|
||||
format!("{REQUEST_FAILED}: endpoint is disabled")
|
||||
}
|
||||
http::StatusCode::LOCKED | http::StatusCode::UNPROCESSABLE_ENTITY => {
|
||||
// Status 423: project might be in maintenance mode (or bad state), or quotas exceeded.
|
||||
format!("{REQUEST_FAILED}: endpoint is temporarily unavailable. Check your quotas and/or contact our support.")
|
||||
}
|
||||
_ => REQUEST_FAILED.to_owned(),
|
||||
},
|
||||
Console(c) => c.get_user_facing_message(),
|
||||
_ => REQUEST_FAILED.to_owned(),
|
||||
}
|
||||
}
|
||||
@@ -91,29 +73,56 @@ pub mod errors {
|
||||
impl ReportableError for ApiError {
|
||||
fn get_error_kind(&self) -> crate::error::ErrorKind {
|
||||
match self {
|
||||
ApiError::Console {
|
||||
status: http::StatusCode::NOT_FOUND | http::StatusCode::NOT_ACCEPTABLE,
|
||||
..
|
||||
} => crate::error::ErrorKind::User,
|
||||
ApiError::Console {
|
||||
status: http::StatusCode::UNPROCESSABLE_ENTITY,
|
||||
text,
|
||||
} if text.contains("compute time quota of non-primary branches is exceeded") => {
|
||||
crate::error::ErrorKind::User
|
||||
ApiError::Console(e) => {
|
||||
use crate::error::ErrorKind::*;
|
||||
match e.get_reason() {
|
||||
crate::console::messages::Reason::RoleProtected => User,
|
||||
crate::console::messages::Reason::ResourceNotFound => User,
|
||||
crate::console::messages::Reason::ProjectNotFound => User,
|
||||
crate::console::messages::Reason::EndpointNotFound => User,
|
||||
crate::console::messages::Reason::BranchNotFound => User,
|
||||
crate::console::messages::Reason::RateLimitExceeded => ServiceRateLimit,
|
||||
crate::console::messages::Reason::NonPrimaryBranchComputeTimeExceeded => {
|
||||
User
|
||||
}
|
||||
crate::console::messages::Reason::ActiveTimeQuotaExceeded => User,
|
||||
crate::console::messages::Reason::ComputeTimeQuotaExceeded => User,
|
||||
crate::console::messages::Reason::WrittenDataQuotaExceeded => User,
|
||||
crate::console::messages::Reason::DataTransferQuotaExceeded => User,
|
||||
crate::console::messages::Reason::LogicalSizeQuotaExceeded => User,
|
||||
crate::console::messages::Reason::Unknown => match &e {
|
||||
ConsoleError {
|
||||
http_status_code:
|
||||
http::StatusCode::NOT_FOUND | http::StatusCode::NOT_ACCEPTABLE,
|
||||
..
|
||||
} => crate::error::ErrorKind::User,
|
||||
ConsoleError {
|
||||
http_status_code: http::StatusCode::UNPROCESSABLE_ENTITY,
|
||||
error,
|
||||
..
|
||||
} if error.contains(
|
||||
"compute time quota of non-primary branches is exceeded",
|
||||
) =>
|
||||
{
|
||||
crate::error::ErrorKind::User
|
||||
}
|
||||
ConsoleError {
|
||||
http_status_code: http::StatusCode::LOCKED,
|
||||
error,
|
||||
..
|
||||
} if error.contains("quota exceeded")
|
||||
|| error.contains("the limit for current plan reached") =>
|
||||
{
|
||||
crate::error::ErrorKind::User
|
||||
}
|
||||
ConsoleError {
|
||||
http_status_code: http::StatusCode::TOO_MANY_REQUESTS,
|
||||
..
|
||||
} => crate::error::ErrorKind::ServiceRateLimit,
|
||||
ConsoleError { .. } => crate::error::ErrorKind::ControlPlane,
|
||||
},
|
||||
}
|
||||
}
|
||||
ApiError::Console {
|
||||
status: http::StatusCode::LOCKED,
|
||||
text,
|
||||
} if text.contains("quota exceeded")
|
||||
|| text.contains("the limit for current plan reached") =>
|
||||
{
|
||||
crate::error::ErrorKind::User
|
||||
}
|
||||
ApiError::Console {
|
||||
status: http::StatusCode::TOO_MANY_REQUESTS,
|
||||
..
|
||||
} => crate::error::ErrorKind::ServiceRateLimit,
|
||||
ApiError::Console { .. } => crate::error::ErrorKind::ControlPlane,
|
||||
ApiError::Transport(_) => crate::error::ErrorKind::ControlPlane,
|
||||
}
|
||||
}
|
||||
@@ -124,31 +133,7 @@ pub mod errors {
|
||||
match self {
|
||||
// retry some transport errors
|
||||
Self::Transport(io) => io.could_retry(),
|
||||
// retry some temporary failures because the compute was in a bad state
|
||||
// (bad request can be returned when the endpoint was in transition)
|
||||
Self::Console {
|
||||
status: http::StatusCode::BAD_REQUEST,
|
||||
..
|
||||
} => true,
|
||||
// don't retry when quotas are exceeded
|
||||
Self::Console {
|
||||
status: http::StatusCode::UNPROCESSABLE_ENTITY,
|
||||
ref text,
|
||||
} => !text.contains("compute time quota of non-primary branches is exceeded"),
|
||||
// locked can be returned when the endpoint was in transition
|
||||
// or when quotas are exceeded. don't retry when quotas are exceeded
|
||||
Self::Console {
|
||||
status: http::StatusCode::LOCKED,
|
||||
ref text,
|
||||
} => {
|
||||
// written data quota exceeded
|
||||
// data transfer quota exceeded
|
||||
// compute time quota exceeded
|
||||
// logical size quota exceeded
|
||||
!text.contains("quota exceeded")
|
||||
&& !text.contains("the limit for current plan reached")
|
||||
}
|
||||
_ => false,
|
||||
Self::Console(e) => e.could_retry(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -509,7 +494,7 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
|
||||
self.metrics
|
||||
.semaphore_acquire_seconds
|
||||
.observe(now.elapsed().as_secs_f64());
|
||||
|
||||
info!("acquired permit {:?}", now.elapsed().as_secs_f64());
|
||||
Ok(WakeComputePermit { permit: permit? })
|
||||
}
|
||||
|
||||
|
||||
@@ -94,12 +94,14 @@ impl Api {
|
||||
let body = match parse_body::<GetRoleSecret>(response).await {
|
||||
Ok(body) => body,
|
||||
// Error 404 is special: it's ok not to have a secret.
|
||||
Err(e) => match e.http_status_code() {
|
||||
Some(http::StatusCode::NOT_FOUND) => {
|
||||
// TODO(anna): retry
|
||||
Err(e) => {
|
||||
if e.get_reason().is_not_found() {
|
||||
return Ok(AuthInfo::default());
|
||||
} else {
|
||||
return Err(e.into());
|
||||
}
|
||||
_otherwise => return Err(e.into()),
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
let secret = if body.role_secret.is_empty() {
|
||||
@@ -328,19 +330,24 @@ async fn parse_body<T: for<'a> serde::Deserialize<'a>>(
|
||||
info!("request succeeded, processing the body");
|
||||
return Ok(response.json().await?);
|
||||
}
|
||||
let s = response.bytes().await?;
|
||||
// Log plaintext to be able to detect, whether there are some cases not covered by the error struct.
|
||||
info!("response_error plaintext: {:?}", s);
|
||||
|
||||
// Don't throw an error here because it's not as important
|
||||
// as the fact that the request itself has failed.
|
||||
let body = response.json().await.unwrap_or_else(|e| {
|
||||
let mut body = serde_json::from_slice(&s).unwrap_or_else(|e| {
|
||||
warn!("failed to parse error body: {e}");
|
||||
ConsoleError {
|
||||
error: "reason unclear (malformed error message)".into(),
|
||||
http_status_code: status,
|
||||
status: None,
|
||||
}
|
||||
});
|
||||
body.http_status_code = status;
|
||||
|
||||
let text = body.error;
|
||||
error!("console responded with an error ({status}): {text}");
|
||||
Err(ApiError::Console { status, text })
|
||||
error!("console responded with an error ({status}): {body:?}");
|
||||
Err(ApiError::Console(body))
|
||||
}
|
||||
|
||||
fn parse_host_port(input: &str) -> Option<(&str, u16)> {
|
||||
|
||||
@@ -12,7 +12,7 @@ use crate::auth::backend::{
|
||||
};
|
||||
use crate::config::{CertResolver, RetryConfig};
|
||||
use crate::console::caches::NodeInfoCache;
|
||||
use crate::console::messages::MetricsAuxInfo;
|
||||
use crate::console::messages::{ConsoleError, MetricsAuxInfo};
|
||||
use crate::console::provider::{CachedAllowedIps, CachedRoleSecret, ConsoleBackend};
|
||||
use crate::console::{self, CachedNodeInfo, NodeInfo};
|
||||
use crate::error::ErrorKind;
|
||||
@@ -484,18 +484,20 @@ impl TestBackend for TestConnectMechanism {
|
||||
match action {
|
||||
ConnectAction::Wake => Ok(helper_create_cached_node_info(self.cache)),
|
||||
ConnectAction::WakeFail => {
|
||||
let err = console::errors::ApiError::Console {
|
||||
status: http::StatusCode::FORBIDDEN,
|
||||
text: "TEST".into(),
|
||||
};
|
||||
let err = console::errors::ApiError::Console(ConsoleError {
|
||||
http_status_code: http::StatusCode::FORBIDDEN,
|
||||
error: "TEST".into(),
|
||||
status: None,
|
||||
});
|
||||
assert!(!err.could_retry());
|
||||
Err(console::errors::WakeComputeError::ApiError(err))
|
||||
}
|
||||
ConnectAction::WakeRetry => {
|
||||
let err = console::errors::ApiError::Console {
|
||||
status: http::StatusCode::BAD_REQUEST,
|
||||
text: "TEST".into(),
|
||||
};
|
||||
let err = console::errors::ApiError::Console(ConsoleError {
|
||||
http_status_code: http::StatusCode::BAD_REQUEST,
|
||||
error: "TEST".into(),
|
||||
status: None,
|
||||
});
|
||||
assert!(err.could_retry());
|
||||
Err(console::errors::WakeComputeError::ApiError(err))
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::config::RetryConfig;
|
||||
use crate::console::messages::ConsoleError;
|
||||
use crate::console::{errors::WakeComputeError, provider::CachedNodeInfo};
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::metrics::{
|
||||
@@ -88,36 +89,76 @@ fn report_error(e: &WakeComputeError, retry: bool) {
|
||||
let kind = match e {
|
||||
WakeComputeError::BadComputeAddress(_) => WakeupFailureKind::BadComputeAddress,
|
||||
WakeComputeError::ApiError(ApiError::Transport(_)) => WakeupFailureKind::ApiTransportError,
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::LOCKED,
|
||||
ref text,
|
||||
}) if text.contains("written data quota exceeded")
|
||||
|| text.contains("the limit for current plan reached") =>
|
||||
{
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::UNPROCESSABLE_ENTITY,
|
||||
ref text,
|
||||
}) if text.contains("compute time quota of non-primary branches is exceeded") => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::LOCKED,
|
||||
..
|
||||
}) => WakeupFailureKind::ApiConsoleLocked,
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::BAD_REQUEST,
|
||||
..
|
||||
}) => WakeupFailureKind::ApiConsoleBadRequest,
|
||||
WakeComputeError::ApiError(ApiError::Console { status, .. })
|
||||
if status.is_server_error() =>
|
||||
{
|
||||
WakeupFailureKind::ApiConsoleOtherServerError
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console { .. }) => {
|
||||
WakeupFailureKind::ApiConsoleOtherError
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console(e)) => match e.get_reason() {
|
||||
crate::console::messages::Reason::RoleProtected => {
|
||||
WakeupFailureKind::ApiConsoleBadRequest
|
||||
}
|
||||
crate::console::messages::Reason::ResourceNotFound => {
|
||||
WakeupFailureKind::ApiConsoleBadRequest
|
||||
}
|
||||
crate::console::messages::Reason::ProjectNotFound => {
|
||||
WakeupFailureKind::ApiConsoleBadRequest
|
||||
}
|
||||
crate::console::messages::Reason::EndpointNotFound => {
|
||||
WakeupFailureKind::ApiConsoleBadRequest
|
||||
}
|
||||
crate::console::messages::Reason::BranchNotFound => {
|
||||
WakeupFailureKind::ApiConsoleBadRequest
|
||||
}
|
||||
crate::console::messages::Reason::RateLimitExceeded => {
|
||||
WakeupFailureKind::ApiConsoleLocked
|
||||
}
|
||||
crate::console::messages::Reason::NonPrimaryBranchComputeTimeExceeded => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
crate::console::messages::Reason::ActiveTimeQuotaExceeded => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
crate::console::messages::Reason::ComputeTimeQuotaExceeded => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
crate::console::messages::Reason::WrittenDataQuotaExceeded => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
crate::console::messages::Reason::DataTransferQuotaExceeded => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
crate::console::messages::Reason::LogicalSizeQuotaExceeded => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
crate::console::messages::Reason::Unknown => match e {
|
||||
ConsoleError {
|
||||
http_status_code: StatusCode::LOCKED,
|
||||
ref error,
|
||||
..
|
||||
} if error.contains("written data quota exceeded")
|
||||
|| error.contains("the limit for current plan reached") =>
|
||||
{
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
ConsoleError {
|
||||
http_status_code: StatusCode::UNPROCESSABLE_ENTITY,
|
||||
ref error,
|
||||
..
|
||||
} if error.contains("compute time quota of non-primary branches is exceeded") => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
ConsoleError {
|
||||
http_status_code: StatusCode::LOCKED,
|
||||
..
|
||||
} => WakeupFailureKind::ApiConsoleLocked,
|
||||
ConsoleError {
|
||||
http_status_code: StatusCode::BAD_REQUEST,
|
||||
..
|
||||
} => WakeupFailureKind::ApiConsoleBadRequest,
|
||||
ConsoleError {
|
||||
http_status_code, ..
|
||||
} if http_status_code.is_server_error() => {
|
||||
WakeupFailureKind::ApiConsoleOtherServerError
|
||||
}
|
||||
ConsoleError { .. } => WakeupFailureKind::ApiConsoleOtherError,
|
||||
},
|
||||
},
|
||||
WakeComputeError::TooManyConnections => WakeupFailureKind::ApiConsoleLocked,
|
||||
WakeComputeError::TooManyConnectionAttempts(_) => WakeupFailureKind::TimeoutError,
|
||||
};
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
use std::usize;
|
||||
|
||||
use super::{LimitAlgorithm, Outcome, Sample};
|
||||
|
||||
/// Loss-based congestion avoidance.
|
||||
|
||||
@@ -32,8 +32,6 @@ pub struct ClientFirstMessage<'a> {
|
||||
pub bare: &'a str,
|
||||
/// Channel binding mode.
|
||||
pub cbind_flag: ChannelBinding<&'a str>,
|
||||
/// (Client username)[<https://github.com/postgres/postgres/blob/94226d4506e66d6e7cbf/src/backend/libpq/auth-scram.c#L13>].
|
||||
pub username: &'a str,
|
||||
/// Client nonce.
|
||||
pub nonce: &'a str,
|
||||
}
|
||||
@@ -58,6 +56,14 @@ impl<'a> ClientFirstMessage<'a> {
|
||||
|
||||
// In theory, these might be preceded by "reserved-mext" (i.e. "m=")
|
||||
let username = parts.next()?.strip_prefix("n=")?;
|
||||
|
||||
// https://github.com/postgres/postgres/blob/f83908798f78c4cafda217ca875602c88ea2ae28/src/backend/libpq/auth-scram.c#L13-L14
|
||||
if !username.is_empty() {
|
||||
tracing::warn!(username, "scram username provided, but is not expected")
|
||||
// TODO(conrad):
|
||||
// return None;
|
||||
}
|
||||
|
||||
let nonce = parts.next()?.strip_prefix("r=")?;
|
||||
|
||||
// Validate but ignore auth extensions
|
||||
@@ -66,7 +72,6 @@ impl<'a> ClientFirstMessage<'a> {
|
||||
Some(Self {
|
||||
bare,
|
||||
cbind_flag,
|
||||
username,
|
||||
nonce,
|
||||
})
|
||||
}
|
||||
@@ -188,19 +193,18 @@ mod tests {
|
||||
|
||||
// (Almost) real strings captured during debug sessions
|
||||
let cases = [
|
||||
(NotSupportedClient, "n,,n=pepe,r=t8JwklwKecDLwSsA72rHmVju"),
|
||||
(NotSupportedServer, "y,,n=pepe,r=t8JwklwKecDLwSsA72rHmVju"),
|
||||
(NotSupportedClient, "n,,n=,r=t8JwklwKecDLwSsA72rHmVju"),
|
||||
(NotSupportedServer, "y,,n=,r=t8JwklwKecDLwSsA72rHmVju"),
|
||||
(
|
||||
Required("tls-server-end-point"),
|
||||
"p=tls-server-end-point,,n=pepe,r=t8JwklwKecDLwSsA72rHmVju",
|
||||
"p=tls-server-end-point,,n=,r=t8JwklwKecDLwSsA72rHmVju",
|
||||
),
|
||||
];
|
||||
|
||||
for (cb, input) in cases {
|
||||
let msg = ClientFirstMessage::parse(input).unwrap();
|
||||
|
||||
assert_eq!(msg.bare, "n=pepe,r=t8JwklwKecDLwSsA72rHmVju");
|
||||
assert_eq!(msg.username, "pepe");
|
||||
assert_eq!(msg.bare, "n=,r=t8JwklwKecDLwSsA72rHmVju");
|
||||
assert_eq!(msg.nonce, "t8JwklwKecDLwSsA72rHmVju");
|
||||
assert_eq!(msg.cbind_flag, cb);
|
||||
}
|
||||
@@ -208,14 +212,13 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn parse_client_first_message_with_invalid_gs2_authz() {
|
||||
assert!(ClientFirstMessage::parse("n,authzid,n=user,r=nonce").is_none())
|
||||
assert!(ClientFirstMessage::parse("n,authzid,n=,r=nonce").is_none())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_client_first_message_with_extra_params() {
|
||||
let msg = ClientFirstMessage::parse("n,,n=user,r=nonce,a=foo,b=bar,c=baz").unwrap();
|
||||
assert_eq!(msg.bare, "n=user,r=nonce,a=foo,b=bar,c=baz");
|
||||
assert_eq!(msg.username, "user");
|
||||
let msg = ClientFirstMessage::parse("n,,n=,r=nonce,a=foo,b=bar,c=baz").unwrap();
|
||||
assert_eq!(msg.bare, "n=,r=nonce,a=foo,b=bar,c=baz");
|
||||
assert_eq!(msg.nonce, "nonce");
|
||||
assert_eq!(msg.cbind_flag, ChannelBinding::NotSupportedClient);
|
||||
}
|
||||
@@ -223,9 +226,9 @@ mod tests {
|
||||
#[test]
|
||||
fn parse_client_first_message_with_extra_params_invalid() {
|
||||
// must be of the form `<ascii letter>=<...>`
|
||||
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,abc=foo").is_none());
|
||||
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,1=foo").is_none());
|
||||
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,a").is_none());
|
||||
assert!(ClientFirstMessage::parse("n,,n=,r=nonce,abc=foo").is_none());
|
||||
assert!(ClientFirstMessage::parse("n,,n=,r=nonce,1=foo").is_none());
|
||||
assert!(ClientFirstMessage::parse("n,,n=,r=nonce,a").is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[toolchain]
|
||||
channel = "1.78.0"
|
||||
channel = "1.79.0"
|
||||
profile = "default"
|
||||
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
|
||||
# https://rust-lang.github.io/rustup/concepts/profiles.html
|
||||
|
||||
@@ -46,6 +46,7 @@ tokio = { workspace = true, features = ["fs"] }
|
||||
tokio-util = { workspace = true }
|
||||
tokio-io-timeout.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
tokio-tar.workspace = true
|
||||
toml_edit.workspace = true
|
||||
tracing.workspace = true
|
||||
url.workspace = true
|
||||
|
||||
@@ -13,7 +13,9 @@ use tokio::runtime::Handle;
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
use tokio::task::JoinError;
|
||||
use toml_edit::Document;
|
||||
use utils::logging::SecretString;
|
||||
|
||||
use std::env::{var, VarError};
|
||||
use std::fs::{self, File};
|
||||
use std::io::{ErrorKind, Write};
|
||||
use std::str::FromStr;
|
||||
@@ -287,6 +289,22 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
};
|
||||
|
||||
// Load JWT auth token to connect to other safekeepers for pull_timeline.
|
||||
let sk_auth_token = match var("SAFEKEEPER_AUTH_TOKEN") {
|
||||
Ok(v) => {
|
||||
info!("loaded JWT token for authentication with safekeepers");
|
||||
Some(SecretString::from(v))
|
||||
}
|
||||
Err(VarError::NotPresent) => {
|
||||
info!("no JWT token for authentication with safekeepers detected");
|
||||
None
|
||||
}
|
||||
Err(_) => {
|
||||
warn!("JWT token for authentication with safekeepers is not unicode");
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let conf = SafeKeeperConf {
|
||||
workdir,
|
||||
my_id: id,
|
||||
@@ -307,6 +325,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
pg_auth,
|
||||
pg_tenant_only_auth,
|
||||
http_auth,
|
||||
sk_auth_token,
|
||||
current_thread_runtime: args.current_thread_runtime,
|
||||
walsenders_keep_horizon: args.walsenders_keep_horizon,
|
||||
partial_backup_enabled: args.partial_backup_enabled,
|
||||
|
||||
@@ -23,7 +23,7 @@ pub const SK_MAGIC: u32 = 0xcafeceefu32;
|
||||
pub const SK_FORMAT_VERSION: u32 = 8;
|
||||
|
||||
// contains persistent metadata for safekeeper
|
||||
const CONTROL_FILE_NAME: &str = "safekeeper.control";
|
||||
pub const CONTROL_FILE_NAME: &str = "safekeeper.control";
|
||||
// needed to atomically update the state using `rename`
|
||||
const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
|
||||
pub const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
|
||||
|
||||
139
safekeeper/src/http/client.rs
Normal file
139
safekeeper/src/http/client.rs
Normal file
@@ -0,0 +1,139 @@
|
||||
//! Safekeeper http client.
|
||||
//!
|
||||
//! Partially copied from pageserver client; some parts might be better to be
|
||||
//! united.
|
||||
//!
|
||||
//! It would be also good to move it out to separate crate, but this needs
|
||||
//! duplication of internal-but-reported structs like WalSenderState, ServerInfo
|
||||
//! etc.
|
||||
|
||||
use reqwest::{IntoUrl, Method, StatusCode};
|
||||
use utils::{
|
||||
http::error::HttpErrorBody,
|
||||
id::{TenantId, TimelineId},
|
||||
logging::SecretString,
|
||||
};
|
||||
|
||||
use super::routes::TimelineStatus;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Client {
|
||||
mgmt_api_endpoint: String,
|
||||
authorization_header: Option<SecretString>,
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum Error {
|
||||
/// Failed to receive body (reqwest error).
|
||||
#[error("receive body: {0}")]
|
||||
ReceiveBody(reqwest::Error),
|
||||
|
||||
/// Status is not ok, but failed to parse body as `HttpErrorBody`.
|
||||
#[error("receive error body: {0}")]
|
||||
ReceiveErrorBody(String),
|
||||
|
||||
/// Status is not ok; parsed error in body as `HttpErrorBody`.
|
||||
#[error("safekeeper API: {1}")]
|
||||
ApiError(StatusCode, String),
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
pub trait ResponseErrorMessageExt: Sized {
|
||||
fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
|
||||
}
|
||||
|
||||
/// If status is not ok, try to extract error message from the body.
|
||||
impl ResponseErrorMessageExt for reqwest::Response {
|
||||
async fn error_from_body(self) -> Result<Self> {
|
||||
let status = self.status();
|
||||
if !(status.is_client_error() || status.is_server_error()) {
|
||||
return Ok(self);
|
||||
}
|
||||
|
||||
let url = self.url().to_owned();
|
||||
Err(match self.json::<HttpErrorBody>().await {
|
||||
Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
|
||||
Err(_) => {
|
||||
Error::ReceiveErrorBody(format!("http error ({}) at {}.", status.as_u16(), url))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new(mgmt_api_endpoint: String, jwt: Option<SecretString>) -> Self {
|
||||
Self::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt)
|
||||
}
|
||||
|
||||
pub fn from_client(
|
||||
client: reqwest::Client,
|
||||
mgmt_api_endpoint: String,
|
||||
jwt: Option<SecretString>,
|
||||
) -> Self {
|
||||
Self {
|
||||
mgmt_api_endpoint,
|
||||
authorization_header: jwt
|
||||
.map(|jwt| SecretString::from(format!("Bearer {}", jwt.get_contents()))),
|
||||
client,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn timeline_status(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<TimelineStatus> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{}/timeline/{}",
|
||||
self.mgmt_api_endpoint, tenant_id, timeline_id
|
||||
);
|
||||
let resp = self.get(&uri).await?;
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn snapshot(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<reqwest::Response> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{}/timeline/{}/snapshot",
|
||||
self.mgmt_api_endpoint, tenant_id, timeline_id
|
||||
);
|
||||
self.get(&uri).await
|
||||
}
|
||||
|
||||
async fn get<U: IntoUrl>(&self, uri: U) -> Result<reqwest::Response> {
|
||||
self.request(Method::GET, uri, ()).await
|
||||
}
|
||||
|
||||
/// Send the request and check that the status code is good.
|
||||
async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
|
||||
&self,
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: B,
|
||||
) -> Result<reqwest::Response> {
|
||||
let res = self.request_noerror(method, uri, body).await?;
|
||||
let response = res.error_from_body().await?;
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
/// Just send the request.
|
||||
async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
|
||||
&self,
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: B,
|
||||
) -> Result<reqwest::Response> {
|
||||
let req = self.client.request(method, uri);
|
||||
let req = if let Some(value) = &self.authorization_header {
|
||||
req.header(reqwest::header::AUTHORIZATION, value.get_contents())
|
||||
} else {
|
||||
req
|
||||
};
|
||||
req.json(&body).send().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
pub mod client;
|
||||
pub mod routes;
|
||||
pub use routes::make_router;
|
||||
|
||||
|
||||
@@ -1,38 +1,25 @@
|
||||
use hyper::{Body, Request, Response, StatusCode, Uri};
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fmt;
|
||||
use std::io::Write as _;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use storage_broker::proto::SafekeeperTimelineInfo;
|
||||
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::task;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info_span, Instrument};
|
||||
use utils::failpoint_support::failpoints_handler;
|
||||
use utils::http::endpoint::{prometheus_metrics_handler, request_span, ChannelWriter};
|
||||
use utils::http::request::parse_query_param;
|
||||
|
||||
use std::io::Write as _;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tracing::{info_span, Instrument};
|
||||
use utils::http::endpoint::{prometheus_metrics_handler, request_span, ChannelWriter};
|
||||
|
||||
use crate::debug_dump::TimelineDigestRequest;
|
||||
use crate::receive_wal::WalReceiverState;
|
||||
use crate::safekeeper::Term;
|
||||
use crate::safekeeper::{ServerInfo, TermLsn};
|
||||
use crate::send_wal::WalSenderState;
|
||||
use crate::timeline::PeerInfo;
|
||||
use crate::{copy_timeline, debug_dump, patch_control_file, pull_timeline};
|
||||
|
||||
use crate::timelines_global_map::TimelineDeleteForceResult;
|
||||
use crate::GlobalTimelines;
|
||||
use crate::SafeKeeperConf;
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use safekeeper_api::models::TimelineCreateRequest;
|
||||
use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest};
|
||||
use utils::{
|
||||
auth::SwappableJwtAuth,
|
||||
http::{
|
||||
@@ -46,7 +33,16 @@ use utils::{
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use super::models::TimelineCreateRequest;
|
||||
use crate::debug_dump::TimelineDigestRequest;
|
||||
use crate::receive_wal::WalReceiverState;
|
||||
use crate::safekeeper::Term;
|
||||
use crate::safekeeper::{ServerInfo, TermLsn};
|
||||
use crate::send_wal::WalSenderState;
|
||||
use crate::timeline::PeerInfo;
|
||||
use crate::timelines_global_map::TimelineDeleteForceResult;
|
||||
use crate::GlobalTimelines;
|
||||
use crate::SafeKeeperConf;
|
||||
use crate::{copy_timeline, debug_dump, patch_control_file, pull_timeline};
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct SafekeeperStatus {
|
||||
@@ -199,13 +195,50 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
check_permission(&request, None)?;
|
||||
|
||||
let data: pull_timeline::Request = json_request(&mut request).await?;
|
||||
let conf = get_conf(&request);
|
||||
|
||||
let resp = pull_timeline::handle_request(data)
|
||||
let resp = pull_timeline::handle_request(data, conf.sk_auth_token.clone())
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::OK, resp)
|
||||
}
|
||||
|
||||
/// Stream tar archive with all timeline data.
|
||||
async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let ttid = TenantTimelineId::new(
|
||||
parse_request_param(&request, "tenant_id")?,
|
||||
parse_request_param(&request, "timeline_id")?,
|
||||
);
|
||||
check_permission(&request, Some(ttid.tenant_id))?;
|
||||
|
||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
||||
// Note: with evicted timelines it should work better then de-evict them and
|
||||
// stream; probably start_snapshot would copy partial s3 file to dest path
|
||||
// and stream control file, or return FullAccessTimeline if timeline is not
|
||||
// evicted.
|
||||
let tli = tli
|
||||
.full_access_guard()
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
// To stream the body use wrap_stream which wants Stream of Result<Bytes>,
|
||||
// so create the chan and write to it in another task.
|
||||
let (tx, rx) = mpsc::channel(1);
|
||||
|
||||
task::spawn(pull_timeline::stream_snapshot(tli, tx));
|
||||
|
||||
let rx_stream = ReceiverStream::new(rx);
|
||||
let body = Body::wrap_stream(rx_stream);
|
||||
|
||||
let response = Response::builder()
|
||||
.status(200)
|
||||
.header(hyper::header::CONTENT_TYPE, "application/octet-stream")
|
||||
.body(body)
|
||||
.unwrap();
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn timeline_copy_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
|
||||
@@ -260,41 +293,6 @@ async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body
|
||||
json_response(StatusCode::OK, response)
|
||||
}
|
||||
|
||||
/// Download a file from the timeline directory.
|
||||
// TODO: figure out a better way to copy files between safekeepers
|
||||
async fn timeline_files_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let ttid = TenantTimelineId::new(
|
||||
parse_request_param(&request, "tenant_id")?,
|
||||
parse_request_param(&request, "timeline_id")?,
|
||||
);
|
||||
check_permission(&request, Some(ttid.tenant_id))?;
|
||||
|
||||
let filename: String = parse_request_param(&request, "filename")?;
|
||||
|
||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
||||
let tli = tli
|
||||
.full_access_guard()
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
let filepath = tli.get_timeline_dir().join(filename);
|
||||
let mut file = File::open(&filepath)
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
|
||||
let mut content = Vec::new();
|
||||
// TODO: don't store files in memory
|
||||
file.read_to_end(&mut content)
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header("Content-Type", "application/octet-stream")
|
||||
.body(Body::from(content))
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))
|
||||
}
|
||||
|
||||
/// Force persist control file.
|
||||
async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
@@ -566,13 +564,13 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
.delete("/v1/tenant/:tenant_id", |r| {
|
||||
request_span(r, tenant_delete_handler)
|
||||
})
|
||||
.get(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot",
|
||||
|r| request_span(r, timeline_snapshot_handler),
|
||||
)
|
||||
.post("/v1/pull_timeline", |r| {
|
||||
request_span(r, timeline_pull_handler)
|
||||
})
|
||||
.get(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename",
|
||||
|r| request_span(r, timeline_files_handler),
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy",
|
||||
|r| request_span(r, timeline_copy_handler),
|
||||
|
||||
@@ -7,7 +7,7 @@ use tokio::runtime::Runtime;
|
||||
use std::time::Duration;
|
||||
use storage_broker::Uri;
|
||||
|
||||
use utils::{auth::SwappableJwtAuth, id::NodeId};
|
||||
use utils::{auth::SwappableJwtAuth, id::NodeId, logging::SecretString};
|
||||
|
||||
mod auth;
|
||||
pub mod broker;
|
||||
@@ -78,6 +78,8 @@ pub struct SafeKeeperConf {
|
||||
pub pg_auth: Option<Arc<JwtAuth>>,
|
||||
pub pg_tenant_only_auth: Option<Arc<JwtAuth>>,
|
||||
pub http_auth: Option<Arc<SwappableJwtAuth>>,
|
||||
/// JWT token to connect to other safekeepers with.
|
||||
pub sk_auth_token: Option<SecretString>,
|
||||
pub current_thread_runtime: bool,
|
||||
pub walsenders_keep_horizon: bool,
|
||||
pub partial_backup_enabled: bool,
|
||||
@@ -114,6 +116,7 @@ impl SafeKeeperConf {
|
||||
pg_auth: None,
|
||||
pg_tenant_only_auth: None,
|
||||
http_auth: None,
|
||||
sk_auth_token: None,
|
||||
heartbeat_timeout: Duration::new(5, 0),
|
||||
max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
|
||||
current_thread_runtime: false,
|
||||
|
||||
@@ -1,28 +1,244 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8PathBuf;
|
||||
use camino_tempfile::Utf8TempDir;
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures::{SinkExt, StreamExt, TryStreamExt};
|
||||
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
cmp::min,
|
||||
io::{self, ErrorKind},
|
||||
sync::Arc,
|
||||
};
|
||||
use tokio::{
|
||||
fs::{File, OpenOptions},
|
||||
io::AsyncWrite,
|
||||
sync::mpsc,
|
||||
task,
|
||||
};
|
||||
use tokio_tar::{Archive, Builder};
|
||||
use tokio_util::{
|
||||
io::{CopyToBytes, SinkWriter},
|
||||
sync::PollSender,
|
||||
};
|
||||
use tracing::{error, info, instrument};
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tracing::info;
|
||||
use crate::{
|
||||
control_file::{self, CONTROL_FILE_NAME},
|
||||
debug_dump,
|
||||
http::{
|
||||
client::{self, Client},
|
||||
routes::TimelineStatus,
|
||||
},
|
||||
safekeeper::Term,
|
||||
timeline::{get_tenant_dir, get_timeline_dir, FullAccessTimeline, Timeline, TimelineError},
|
||||
wal_storage::{self, open_wal_file, Storage},
|
||||
GlobalTimelines, SafeKeeperConf,
|
||||
};
|
||||
use utils::{
|
||||
crashsafe::{durable_rename, fsync_async_opt},
|
||||
id::{TenantId, TenantTimelineId, TimelineId},
|
||||
logging::SecretString,
|
||||
lsn::Lsn,
|
||||
pausable_failpoint,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
control_file, debug_dump,
|
||||
http::routes::TimelineStatus,
|
||||
timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError},
|
||||
wal_storage::{self, Storage},
|
||||
GlobalTimelines, SafeKeeperConf,
|
||||
};
|
||||
/// Stream tar archive of timeline to tx.
|
||||
#[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))]
|
||||
pub async fn stream_snapshot(tli: FullAccessTimeline, tx: mpsc::Sender<Result<Bytes>>) {
|
||||
if let Err(e) = stream_snapshot_guts(tli, tx.clone()).await {
|
||||
// Error type/contents don't matter as they won't can't reach the client
|
||||
// (hyper likely doesn't do anything with it), but http stream will be
|
||||
// prematurely terminated. It would be nice to try to send the error in
|
||||
// trailers though.
|
||||
tx.send(Err(anyhow!("snapshot failed"))).await.ok();
|
||||
error!("snapshot failed: {:#}", e);
|
||||
}
|
||||
}
|
||||
|
||||
/// Info about timeline on safekeeper ready for reporting.
|
||||
/// State needed while streaming the snapshot.
|
||||
pub struct SnapshotContext {
|
||||
pub from_segno: XLogSegNo, // including
|
||||
pub upto_segno: XLogSegNo, // including
|
||||
pub term: Term,
|
||||
pub last_log_term: Term,
|
||||
pub flush_lsn: Lsn,
|
||||
pub wal_seg_size: usize,
|
||||
// used to remove WAL hold off in Drop.
|
||||
pub tli: FullAccessTimeline,
|
||||
}
|
||||
|
||||
impl Drop for SnapshotContext {
|
||||
fn drop(&mut self) {
|
||||
let tli = self.tli.clone();
|
||||
task::spawn(async move {
|
||||
let mut shared_state = tli.write_shared_state().await;
|
||||
shared_state.wal_removal_on_hold = false;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn stream_snapshot_guts(
|
||||
tli: FullAccessTimeline,
|
||||
tx: mpsc::Sender<Result<Bytes>>,
|
||||
) -> Result<()> {
|
||||
// tokio-tar wants Write implementor, but we have mpsc tx <Result<Bytes>>;
|
||||
// use SinkWriter as a Write impl. That is,
|
||||
// - create Sink from the tx. It returns PollSendError if chan is closed.
|
||||
let sink = PollSender::new(tx);
|
||||
// - SinkWriter needs sink error to be io one, map it.
|
||||
let sink_io_err = sink.sink_map_err(|_| io::Error::from(ErrorKind::BrokenPipe));
|
||||
// - SinkWriter wants sink type to be just Bytes, not Result<Bytes>, so map
|
||||
// it with with(). Note that with() accepts async function which we don't
|
||||
// need and allows the map to fail, which we don't need either, but hence
|
||||
// two Oks.
|
||||
let oksink = sink_io_err.with(|b: Bytes| async { io::Result::Ok(Result::Ok(b)) });
|
||||
// - SinkWriter (not surprisingly) wants sink of &[u8], not bytes, so wrap
|
||||
// into CopyToBytes. This is a data copy.
|
||||
let copy_to_bytes = CopyToBytes::new(oksink);
|
||||
let mut writer = SinkWriter::new(copy_to_bytes);
|
||||
let pinned_writer = std::pin::pin!(writer);
|
||||
|
||||
// Note that tokio_tar append_* funcs use tokio::io::copy with 8KB buffer
|
||||
// which is also likely suboptimal.
|
||||
let mut ar = Builder::new_non_terminated(pinned_writer);
|
||||
|
||||
let bctx = tli.start_snapshot(&mut ar).await?;
|
||||
pausable_failpoint!("sk-snapshot-after-list-pausable");
|
||||
|
||||
let tli_dir = tli.get_timeline_dir();
|
||||
info!(
|
||||
"sending {} segments [{:#X}-{:#X}], term={}, last_log_term={}, flush_lsn={}",
|
||||
bctx.upto_segno - bctx.from_segno + 1,
|
||||
bctx.from_segno,
|
||||
bctx.upto_segno,
|
||||
bctx.term,
|
||||
bctx.last_log_term,
|
||||
bctx.flush_lsn,
|
||||
);
|
||||
for segno in bctx.from_segno..=bctx.upto_segno {
|
||||
let (mut sf, is_partial) = open_wal_file(&tli_dir, segno, bctx.wal_seg_size).await?;
|
||||
let mut wal_file_name = XLogFileName(PG_TLI, segno, bctx.wal_seg_size);
|
||||
if is_partial {
|
||||
wal_file_name.push_str(".partial");
|
||||
}
|
||||
ar.append_file(&wal_file_name, &mut sf).await?;
|
||||
}
|
||||
|
||||
// Do the term check before ar.finish to make archive corrupted in case of
|
||||
// term change. Client shouldn't ignore abrupt stream end, but to be sure.
|
||||
tli.finish_snapshot(&bctx).await?;
|
||||
|
||||
ar.finish().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl FullAccessTimeline {
|
||||
/// Start streaming tar archive with timeline:
|
||||
/// 1) stream control file under lock;
|
||||
/// 2) hold off WAL removal;
|
||||
/// 3) collect SnapshotContext to understand which WAL segments should be
|
||||
/// streamed.
|
||||
///
|
||||
/// Snapshot streams data up to flush_lsn. To make this safe, we must check
|
||||
/// that term doesn't change during the procedure, or we risk sending mix of
|
||||
/// WAL from different histories. Term is remembered in the SnapshotContext
|
||||
/// and checked in finish_snapshot. Note that in the last segment some WAL
|
||||
/// higher than flush_lsn set here might be streamed; that's fine as long as
|
||||
/// terms doesn't change.
|
||||
///
|
||||
/// Alternatively we could send only up to commit_lsn to get some valid
|
||||
/// state which later will be recovered by compute, in this case term check
|
||||
/// is not needed, but we likely don't want that as there might be no
|
||||
/// compute which could perform the recovery.
|
||||
///
|
||||
/// When returned SnapshotContext is dropped WAL hold is removed.
|
||||
async fn start_snapshot<W: AsyncWrite + Unpin + Send>(
|
||||
&self,
|
||||
ar: &mut tokio_tar::Builder<W>,
|
||||
) -> Result<SnapshotContext> {
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
|
||||
let cf_path = self.get_timeline_dir().join(CONTROL_FILE_NAME);
|
||||
let mut cf = File::open(cf_path).await?;
|
||||
ar.append_file(CONTROL_FILE_NAME, &mut cf).await?;
|
||||
|
||||
// We need to stream since the oldest segment someone (s3 or pageserver)
|
||||
// still needs. This duplicates calc_horizon_lsn logic.
|
||||
//
|
||||
// We know that WAL wasn't removed up to this point because it cannot be
|
||||
// removed further than `backup_lsn`. Since we're holding shared_state
|
||||
// lock and setting `wal_removal_on_hold` later, it guarantees that WAL
|
||||
// won't be removed until we're done.
|
||||
let from_lsn = min(
|
||||
shared_state.sk.state.remote_consistent_lsn,
|
||||
shared_state.sk.state.backup_lsn,
|
||||
);
|
||||
if from_lsn == Lsn::INVALID {
|
||||
// this is possible if snapshot is called before handling first
|
||||
// elected message
|
||||
bail!("snapshot is called on uninitialized timeline");
|
||||
}
|
||||
let from_segno = from_lsn.segment_number(shared_state.get_wal_seg_size());
|
||||
let term = shared_state.sk.get_term();
|
||||
let last_log_term = shared_state.sk.get_last_log_term();
|
||||
let flush_lsn = shared_state.sk.flush_lsn();
|
||||
let upto_segno = flush_lsn.segment_number(shared_state.get_wal_seg_size());
|
||||
// have some limit on max number of segments as a sanity check
|
||||
const MAX_ALLOWED_SEGS: u64 = 1000;
|
||||
let num_segs = upto_segno - from_segno + 1;
|
||||
if num_segs > MAX_ALLOWED_SEGS {
|
||||
bail!(
|
||||
"snapshot is called on timeline with {} segments, but the limit is {}",
|
||||
num_segs,
|
||||
MAX_ALLOWED_SEGS
|
||||
);
|
||||
}
|
||||
|
||||
// Prevent WAL removal while we're streaming data.
|
||||
//
|
||||
// Since this a flag, not a counter just bail out if already set; we
|
||||
// shouldn't need concurrent snapshotting.
|
||||
if shared_state.wal_removal_on_hold {
|
||||
bail!("wal_removal_on_hold is already true");
|
||||
}
|
||||
shared_state.wal_removal_on_hold = true;
|
||||
|
||||
let bctx = SnapshotContext {
|
||||
from_segno,
|
||||
upto_segno,
|
||||
term,
|
||||
last_log_term,
|
||||
flush_lsn,
|
||||
wal_seg_size: shared_state.get_wal_seg_size(),
|
||||
tli: self.clone(),
|
||||
};
|
||||
|
||||
Ok(bctx)
|
||||
}
|
||||
|
||||
/// Finish snapshotting: check that term(s) hasn't changed.
|
||||
///
|
||||
/// Note that WAL gc hold off is removed in Drop of SnapshotContext to not
|
||||
/// forget this if snapshotting fails mid the way.
|
||||
pub async fn finish_snapshot(&self, bctx: &SnapshotContext) -> Result<()> {
|
||||
let shared_state = self.read_shared_state().await;
|
||||
let term = shared_state.sk.get_term();
|
||||
let last_log_term = shared_state.sk.get_last_log_term();
|
||||
// There are some cases to relax this check (e.g. last_log_term might
|
||||
// change, but as long as older history is strictly part of new that's
|
||||
// fine), but there is no need to do it.
|
||||
if bctx.term != term || bctx.last_log_term != last_log_term {
|
||||
bail!("term(s) changed during snapshot: were term={}, last_log_term={}, now term={}, last_log_term={}",
|
||||
bctx.term, bctx.last_log_term, term, last_log_term);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// pull_timeline request body.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct Request {
|
||||
pub tenant_id: TenantId,
|
||||
@@ -48,7 +264,10 @@ pub struct DebugDumpResponse {
|
||||
}
|
||||
|
||||
/// Find the most advanced safekeeper and pull timeline from it.
|
||||
pub async fn handle_request(request: Request) -> Result<Response> {
|
||||
pub async fn handle_request(
|
||||
request: Request,
|
||||
sk_auth_token: Option<SecretString>,
|
||||
) -> Result<Response> {
|
||||
let existing_tli = GlobalTimelines::get(TenantTimelineId::new(
|
||||
request.tenant_id,
|
||||
request.timeline_id,
|
||||
@@ -57,28 +276,26 @@ pub async fn handle_request(request: Request) -> Result<Response> {
|
||||
bail!("Timeline {} already exists", request.timeline_id);
|
||||
}
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let http_hosts = request.http_hosts.clone();
|
||||
|
||||
// Send request to /v1/tenant/:tenant_id/timeline/:timeline_id
|
||||
let responses = futures::future::join_all(http_hosts.iter().map(|url| {
|
||||
let url = format!(
|
||||
"{}/v1/tenant/{}/timeline/{}",
|
||||
url, request.tenant_id, request.timeline_id
|
||||
);
|
||||
client.get(url).send()
|
||||
}))
|
||||
.await;
|
||||
// Figure out statuses of potential donors.
|
||||
let responses: Vec<Result<TimelineStatus, client::Error>> =
|
||||
futures::future::join_all(http_hosts.iter().map(|url| async {
|
||||
let cclient = Client::new(url.clone(), sk_auth_token.clone());
|
||||
let info = cclient
|
||||
.timeline_status(request.tenant_id, request.timeline_id)
|
||||
.await?;
|
||||
Ok(info)
|
||||
}))
|
||||
.await;
|
||||
|
||||
let mut statuses = Vec::new();
|
||||
for (i, response) in responses.into_iter().enumerate() {
|
||||
let response = response.context(format!("Failed to get status from {}", http_hosts[i]))?;
|
||||
let status: crate::http::routes::TimelineStatus = response.json().await?;
|
||||
let status = response.context(format!("fetching status from {}", http_hosts[i]))?;
|
||||
statuses.push((status, i));
|
||||
}
|
||||
|
||||
// Find the most advanced safekeeper
|
||||
// TODO: current logic may be wrong, fix it later
|
||||
let (status, i) = statuses
|
||||
.into_iter()
|
||||
.max_by_key(|(status, _)| {
|
||||
@@ -94,10 +311,14 @@ pub async fn handle_request(request: Request) -> Result<Response> {
|
||||
assert!(status.tenant_id == request.tenant_id);
|
||||
assert!(status.timeline_id == request.timeline_id);
|
||||
|
||||
pull_timeline(status, safekeeper_host).await
|
||||
pull_timeline(status, safekeeper_host, sk_auth_token).await
|
||||
}
|
||||
|
||||
async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response> {
|
||||
async fn pull_timeline(
|
||||
status: TimelineStatus,
|
||||
host: String,
|
||||
sk_auth_token: Option<SecretString>,
|
||||
) -> Result<Response> {
|
||||
let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
|
||||
info!(
|
||||
"pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
|
||||
@@ -111,95 +332,53 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
|
||||
|
||||
let conf = &GlobalTimelines::get_global_config();
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
// TODO: don't use debug dump, it should be used only in tests.
|
||||
// This is a proof of concept, we should figure out a way
|
||||
// to use scp without implementing it manually.
|
||||
|
||||
// Implementing our own scp over HTTP.
|
||||
// At first, we need to fetch list of files from safekeeper.
|
||||
let dump: DebugDumpResponse = client
|
||||
.get(format!(
|
||||
"{}/v1/debug_dump?dump_all=true&tenant_id={}&timeline_id={}",
|
||||
host, status.tenant_id, status.timeline_id
|
||||
))
|
||||
.send()
|
||||
.await?
|
||||
.json()
|
||||
.await?;
|
||||
|
||||
if dump.timelines.len() != 1 {
|
||||
bail!(
|
||||
"expected to fetch single timeline, got {} timelines",
|
||||
dump.timelines.len()
|
||||
);
|
||||
}
|
||||
|
||||
let timeline = dump.timelines.into_iter().next().unwrap();
|
||||
let disk_content = timeline.disk_content.ok_or(anyhow::anyhow!(
|
||||
"timeline {} doesn't have disk content",
|
||||
ttid
|
||||
))?;
|
||||
|
||||
let mut filenames = disk_content
|
||||
.files
|
||||
.iter()
|
||||
.map(|file| file.name.clone())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Sort filenames to make sure we pull files in correct order
|
||||
// After sorting, we should have:
|
||||
// - 000000010000000000000001
|
||||
// - ...
|
||||
// - 000000010000000000000002.partial
|
||||
// - safekeeper.control
|
||||
filenames.sort();
|
||||
|
||||
// safekeeper.control should be the first file, so we need to move it to the beginning
|
||||
let control_file_index = filenames
|
||||
.iter()
|
||||
.position(|name| name == "safekeeper.control")
|
||||
.ok_or(anyhow::anyhow!("safekeeper.control not found"))?;
|
||||
filenames.remove(control_file_index);
|
||||
filenames.insert(0, "safekeeper.control".to_string());
|
||||
|
||||
pausable_failpoint!("sk-pull-timeline-after-list-pausable");
|
||||
|
||||
info!(
|
||||
"downloading {} files from safekeeper {}",
|
||||
filenames.len(),
|
||||
host
|
||||
);
|
||||
|
||||
let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
|
||||
|
||||
// Note: some time happens between fetching list of files and fetching files themselves.
|
||||
// It's possible that some files will be removed from safekeeper and we will fail to fetch them.
|
||||
// This function will fail in this case, should be retried by the caller.
|
||||
for filename in filenames {
|
||||
let file_path = tli_dir_path.join(&filename);
|
||||
// /v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename
|
||||
let http_url = format!(
|
||||
"{}/v1/tenant/{}/timeline/{}/file/{}",
|
||||
host, status.tenant_id, status.timeline_id, filename
|
||||
);
|
||||
let client = Client::new(host.clone(), sk_auth_token.clone());
|
||||
// Request stream with basebackup archive.
|
||||
let bb_resp = client
|
||||
.snapshot(status.tenant_id, status.timeline_id)
|
||||
.await?;
|
||||
|
||||
let mut file = tokio::fs::File::create(&file_path).await?;
|
||||
let mut response = client.get(&http_url).send().await?;
|
||||
if response.status() != reqwest::StatusCode::OK {
|
||||
bail!(
|
||||
"pulling file {} failed: status is {}",
|
||||
filename,
|
||||
response.status()
|
||||
);
|
||||
}
|
||||
while let Some(chunk) = response.chunk().await? {
|
||||
file.write_all(&chunk).await?;
|
||||
file.flush().await?;
|
||||
// Make Stream of Bytes from it...
|
||||
let bb_stream = bb_resp.bytes_stream().map_err(std::io::Error::other);
|
||||
// and turn it into StreamReader implementing AsyncRead.
|
||||
let bb_reader = tokio_util::io::StreamReader::new(bb_stream);
|
||||
|
||||
// Extract it on the fly to the disk. We don't use simple unpack() to fsync
|
||||
// files.
|
||||
let mut entries = Archive::new(bb_reader).entries()?;
|
||||
while let Some(base_tar_entry) = entries.next().await {
|
||||
let mut entry = base_tar_entry?;
|
||||
let header = entry.header();
|
||||
let file_path = header.path()?.into_owned();
|
||||
match header.entry_type() {
|
||||
tokio_tar::EntryType::Regular => {
|
||||
let utf8_file_path =
|
||||
Utf8PathBuf::from_path_buf(file_path).expect("non-Unicode path");
|
||||
let dst_path = tli_dir_path.join(utf8_file_path);
|
||||
let mut f = OpenOptions::new()
|
||||
.create(true)
|
||||
.truncate(true)
|
||||
.write(true)
|
||||
.open(&dst_path)
|
||||
.await?;
|
||||
tokio::io::copy(&mut entry, &mut f).await?;
|
||||
// fsync the file
|
||||
f.sync_all().await?;
|
||||
}
|
||||
_ => {
|
||||
bail!(
|
||||
"entry {} in backup tar archive is of unexpected type: {:?}",
|
||||
file_path.display(),
|
||||
header.entry_type()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: fsync?
|
||||
// fsync temp timeline directory to remember its contents.
|
||||
fsync_async_opt(&tli_dir_path, !conf.no_sync).await?;
|
||||
|
||||
// Let's create timeline from temp directory and verify that it's correct
|
||||
let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?;
|
||||
@@ -290,7 +469,9 @@ pub async fn load_temp_timeline(
|
||||
ttid, tmp_path, timeline_path
|
||||
);
|
||||
tokio::fs::create_dir_all(get_tenant_dir(conf, &ttid.tenant_id)).await?;
|
||||
tokio::fs::rename(tmp_path, &timeline_path).await?;
|
||||
// fsync tenant dir creation
|
||||
fsync_async_opt(&conf.workdir, !conf.no_sync).await?;
|
||||
durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
|
||||
|
||||
let tli = GlobalTimelines::load_timeline(&guard, ttid)
|
||||
.await
|
||||
|
||||
@@ -780,6 +780,9 @@ where
|
||||
|
||||
// Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
|
||||
state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn);
|
||||
// similar for remote_consistent_lsn
|
||||
state.remote_consistent_lsn =
|
||||
max(state.remote_consistent_lsn, state.timeline_start_lsn);
|
||||
|
||||
state.acceptor_state.term_history = msg.term_history.clone();
|
||||
self.state.finish_change(&state).await?;
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use camino::Utf8PathBuf;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::fs;
|
||||
use tokio::fs::{self};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::id::TenantId;
|
||||
|
||||
@@ -168,6 +168,9 @@ pub struct SharedState {
|
||||
pub(crate) sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
|
||||
/// In memory list containing state of peers sent in latest messages from them.
|
||||
pub(crate) peers_info: PeersInfo,
|
||||
// True value hinders old WAL removal; this is used by snapshotting. We
|
||||
// could make it a counter, but there is no need to.
|
||||
pub(crate) wal_removal_on_hold: bool,
|
||||
}
|
||||
|
||||
impl SharedState {
|
||||
@@ -205,6 +208,7 @@ impl SharedState {
|
||||
Ok(Self {
|
||||
sk,
|
||||
peers_info: PeersInfo(vec![]),
|
||||
wal_removal_on_hold: false,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -222,10 +226,11 @@ impl SharedState {
|
||||
Ok(Self {
|
||||
sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
|
||||
peers_info: PeersInfo(vec![]),
|
||||
wal_removal_on_hold: false,
|
||||
})
|
||||
}
|
||||
|
||||
fn get_wal_seg_size(&self) -> usize {
|
||||
pub(crate) fn get_wal_seg_size(&self) -> usize {
|
||||
self.sk.state.server.wal_seg_size as usize
|
||||
}
|
||||
|
||||
|
||||
@@ -39,6 +39,7 @@ pub struct StateSnapshot {
|
||||
// misc
|
||||
pub cfile_last_persist_at: Instant,
|
||||
pub inmem_flush_pending: bool,
|
||||
pub wal_removal_on_hold: bool,
|
||||
pub peers: Vec<PeerInfo>,
|
||||
}
|
||||
|
||||
@@ -54,6 +55,7 @@ impl StateSnapshot {
|
||||
cfile_backup_lsn: read_guard.sk.state.backup_lsn,
|
||||
cfile_last_persist_at: read_guard.sk.state.pers.last_persist_at(),
|
||||
inmem_flush_pending: Self::has_unflushed_inmem_state(&read_guard),
|
||||
wal_removal_on_hold: read_guard.wal_removal_on_hold,
|
||||
peers: read_guard.get_peers(heartbeat_timeout),
|
||||
}
|
||||
}
|
||||
@@ -324,8 +326,8 @@ async fn update_wal_removal(
|
||||
last_removed_segno: u64,
|
||||
wal_removal_task: &mut Option<JoinHandle<anyhow::Result<u64>>>,
|
||||
) {
|
||||
if wal_removal_task.is_some() {
|
||||
// WAL removal is already in progress
|
||||
if wal_removal_task.is_some() || state.wal_removal_on_hold {
|
||||
// WAL removal is already in progress or hold off
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -684,13 +684,12 @@ impl WalReader {
|
||||
let xlogoff = self.pos.segment_offset(self.wal_seg_size);
|
||||
let segno = self.pos.segment_number(self.wal_seg_size);
|
||||
let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size);
|
||||
let wal_file_path = self.timeline_dir.join(&wal_file_name);
|
||||
|
||||
// Try to open local file, if we may have WAL locally
|
||||
if self.pos >= self.local_start_lsn {
|
||||
let res = Self::open_wal_file(&wal_file_path).await;
|
||||
let res = open_wal_file(&self.timeline_dir, segno, self.wal_seg_size).await;
|
||||
match res {
|
||||
Ok(mut file) => {
|
||||
Ok((mut file, _)) => {
|
||||
file.seek(SeekFrom::Start(xlogoff as u64)).await?;
|
||||
return Ok(Box::pin(file));
|
||||
}
|
||||
@@ -718,25 +717,6 @@ impl WalReader {
|
||||
|
||||
bail!("WAL segment is not found")
|
||||
}
|
||||
|
||||
/// Helper function for opening a wal file.
|
||||
async fn open_wal_file(wal_file_path: &Utf8Path) -> Result<tokio::fs::File> {
|
||||
// First try to open the .partial file.
|
||||
let mut partial_path = wal_file_path.to_owned();
|
||||
partial_path.set_extension("partial");
|
||||
if let Ok(opened_file) = tokio::fs::File::open(&partial_path).await {
|
||||
return Ok(opened_file);
|
||||
}
|
||||
|
||||
// If that failed, try it without the .partial extension.
|
||||
tokio::fs::File::open(&wal_file_path)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open WAL file {:?}", wal_file_path))
|
||||
.map_err(|e| {
|
||||
warn!("{}", e);
|
||||
e
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Zero block for filling created WAL segments.
|
||||
@@ -758,6 +738,34 @@ async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper function for opening WAL segment `segno` in `dir`. Returns file and
|
||||
/// whether it is .partial.
|
||||
pub(crate) async fn open_wal_file(
|
||||
timeline_dir: &Utf8Path,
|
||||
segno: XLogSegNo,
|
||||
wal_seg_size: usize,
|
||||
) -> Result<(tokio::fs::File, bool)> {
|
||||
let (wal_file_path, wal_file_partial_path) = wal_file_paths(timeline_dir, segno, wal_seg_size)?;
|
||||
|
||||
// First try to open the .partial file.
|
||||
let mut partial_path = wal_file_path.to_owned();
|
||||
partial_path.set_extension("partial");
|
||||
if let Ok(opened_file) = tokio::fs::File::open(&wal_file_partial_path).await {
|
||||
return Ok((opened_file, true));
|
||||
}
|
||||
|
||||
// If that failed, try it without the .partial extension.
|
||||
let pf = tokio::fs::File::open(&wal_file_path)
|
||||
.await
|
||||
.with_context(|| format!("failed to open WAL file {:#}", wal_file_path))
|
||||
.map_err(|e| {
|
||||
warn!("{}", e);
|
||||
e
|
||||
})?;
|
||||
|
||||
Ok((pf, false))
|
||||
}
|
||||
|
||||
/// Helper returning full path to WAL segment file and its .partial brother.
|
||||
pub fn wal_file_paths(
|
||||
timeline_dir: &Utf8Path,
|
||||
|
||||
@@ -174,6 +174,7 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
|
||||
pg_auth: None,
|
||||
pg_tenant_only_auth: None,
|
||||
http_auth: None,
|
||||
sk_auth_token: None,
|
||||
current_thread_runtime: false,
|
||||
walsenders_keep_horizon: false,
|
||||
partial_backup_enabled: false,
|
||||
|
||||
@@ -314,7 +314,7 @@ impl ComputeHook {
|
||||
if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
|
||||
tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
|
||||
endpoint
|
||||
.reconfigure(compute_pageservers.clone(), *stripe_size)
|
||||
.reconfigure(compute_pageservers.clone(), *stripe_size, None)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ pub(crate) enum PageserverState {
|
||||
Available {
|
||||
last_seen_at: Instant,
|
||||
utilization: PageserverUtilization,
|
||||
new: bool,
|
||||
},
|
||||
Offline,
|
||||
}
|
||||
@@ -127,6 +128,7 @@ impl HeartbeaterTask {
|
||||
heartbeat_futs.push({
|
||||
let jwt_token = self.jwt_token.clone();
|
||||
let cancel = self.cancel.clone();
|
||||
let new_node = !self.state.contains_key(node_id);
|
||||
|
||||
// Clone the node and mark it as available such that the request
|
||||
// goes through to the pageserver even when the node is marked offline.
|
||||
@@ -159,6 +161,7 @@ impl HeartbeaterTask {
|
||||
PageserverState::Available {
|
||||
last_seen_at: Instant::now(),
|
||||
utilization,
|
||||
new: new_node,
|
||||
}
|
||||
} else {
|
||||
PageserverState::Offline
|
||||
@@ -220,6 +223,7 @@ impl HeartbeaterTask {
|
||||
}
|
||||
},
|
||||
Vacant(_) => {
|
||||
// This is a new node. Don't generate a delta for it.
|
||||
deltas.push((node_id, ps_state.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::{str::FromStr, time::Duration};
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy,
|
||||
TenantLocateResponseShard,
|
||||
TenantLocateResponseShard, UtilizationScore,
|
||||
},
|
||||
shard::TenantShardId,
|
||||
};
|
||||
@@ -116,6 +116,16 @@ impl Node {
|
||||
match (self.availability, availability) {
|
||||
(Offline, Active(_)) => ToActive,
|
||||
(Active(_), Offline) => ToOffline,
|
||||
// Consider the case when the storage controller handles the re-attach of a node
|
||||
// before the heartbeats detect that the node is back online. We still need
|
||||
// [`Service::node_configure`] to attempt reconciliations for shards with an
|
||||
// unknown observed location.
|
||||
// The unsavoury match arm below handles this situation.
|
||||
(Active(lhs), Active(rhs))
|
||||
if lhs == UtilizationScore::worst() && rhs < UtilizationScore::worst() =>
|
||||
{
|
||||
ToActive
|
||||
}
|
||||
_ => Unchanged,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ use crate::{
|
||||
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, WrappedWriteGuard},
|
||||
persistence::{AbortShardSplitStatus, TenantFilter},
|
||||
reconciler::{ReconcileError, ReconcileUnits},
|
||||
scheduler::{ScheduleContext, ScheduleMode},
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
|
||||
tenant_shard::{
|
||||
MigrateAttachment, ReconcileNeeded, ScheduleOptimization, ScheduleOptimizationAction,
|
||||
},
|
||||
@@ -747,29 +747,61 @@ impl Service {
|
||||
let res = self.heartbeater.heartbeat(nodes).await;
|
||||
if let Ok(deltas) = res {
|
||||
for (node_id, state) in deltas.0 {
|
||||
let new_availability = match state {
|
||||
PageserverState::Available { utilization, .. } => NodeAvailability::Active(
|
||||
UtilizationScore(utilization.utilization_score),
|
||||
let (new_node, new_availability) = match state {
|
||||
PageserverState::Available {
|
||||
utilization, new, ..
|
||||
} => (
|
||||
new,
|
||||
NodeAvailability::Active(UtilizationScore(
|
||||
utilization.utilization_score,
|
||||
)),
|
||||
),
|
||||
PageserverState::Offline => NodeAvailability::Offline,
|
||||
PageserverState::Offline => (false, NodeAvailability::Offline),
|
||||
};
|
||||
let res = self
|
||||
.node_configure(node_id, Some(new_availability), None)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(()) => {}
|
||||
Err(ApiError::NotFound(_)) => {
|
||||
// This should be rare, but legitimate since the heartbeats are done
|
||||
// on a snapshot of the nodes.
|
||||
tracing::info!("Node {} was not found after heartbeat round", node_id);
|
||||
if new_node {
|
||||
// When the heartbeats detect a newly added node, we don't wish
|
||||
// to attempt to reconcile the shards assigned to it. The node
|
||||
// is likely handling it's re-attach response, so reconciling now
|
||||
// would be counterproductive.
|
||||
//
|
||||
// Instead, update the in-memory state with the details learned about the
|
||||
// node.
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, _tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let mut new_nodes = (**nodes).clone();
|
||||
|
||||
if let Some(node) = new_nodes.get_mut(&node_id) {
|
||||
node.set_availability(new_availability);
|
||||
scheduler.node_upsert(node);
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!(
|
||||
"Failed to update node {} after heartbeat round: {}",
|
||||
node_id,
|
||||
err
|
||||
);
|
||||
|
||||
locked.nodes = Arc::new(new_nodes);
|
||||
} else {
|
||||
// This is the code path for geniune availability transitions (i.e node
|
||||
// goes unavailable and/or comes back online).
|
||||
let res = self
|
||||
.node_configure(node_id, Some(new_availability), None)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(()) => {}
|
||||
Err(ApiError::NotFound(_)) => {
|
||||
// This should be rare, but legitimate since the heartbeats are done
|
||||
// on a snapshot of the nodes.
|
||||
tracing::info!(
|
||||
"Node {} was not found after heartbeat round",
|
||||
node_id
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!(
|
||||
"Failed to update node {} after heartbeat round: {}",
|
||||
node_id,
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2409,11 +2441,17 @@ impl Service {
|
||||
(detach_waiters, shard_ids, node.clone())
|
||||
};
|
||||
|
||||
if let Err(e) = self.await_waiters(detach_waiters, RECONCILE_TIMEOUT).await {
|
||||
// Failing to detach shouldn't hold up deletion, e.g. if a node is offline we should be able
|
||||
// to use some other node to run the remote deletion.
|
||||
tracing::warn!("Failed to detach some locations: {e}");
|
||||
}
|
||||
// This reconcile wait can fail in a few ways:
|
||||
// A there is a very long queue for the reconciler semaphore
|
||||
// B some pageserver is failing to handle a detach promptly
|
||||
// C some pageserver goes offline right at the moment we send it a request.
|
||||
//
|
||||
// A and C are transient: the semaphore will eventually become available, and once a node is marked offline
|
||||
// the next attempt to reconcile will silently skip detaches for an offline node and succeed. If B happens,
|
||||
// it's a bug, and needs resolving at the pageserver level (we shouldn't just leave attachments behind while
|
||||
// deleting the underlying data).
|
||||
self.await_waiters(detach_waiters, RECONCILE_TIMEOUT)
|
||||
.await?;
|
||||
|
||||
let locations = shard_ids
|
||||
.into_iter()
|
||||
@@ -2431,13 +2469,11 @@ impl Service {
|
||||
for result in results {
|
||||
match result {
|
||||
Ok(StatusCode::ACCEPTED) => {
|
||||
// This could happen if we failed detach above, and hit a pageserver where the tenant
|
||||
// is still attached: it will accept the deletion in the background
|
||||
tracing::warn!(
|
||||
"Unexpectedly still attached on {}, client should retry",
|
||||
// This should never happen: we waited for detaches to finish above
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Unexpectedly still attached on {}",
|
||||
node
|
||||
);
|
||||
return Ok(StatusCode::ACCEPTED);
|
||||
)));
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(mgmt_api::Error::Cancelled) => {
|
||||
@@ -4312,6 +4348,16 @@ impl Service {
|
||||
continue;
|
||||
}
|
||||
|
||||
if !new_nodes
|
||||
.values()
|
||||
.any(|n| matches!(n.may_schedule(), MaySchedule::Yes(_)))
|
||||
{
|
||||
// Special case for when all nodes are unavailable and/or unschedulable: there is no point
|
||||
// trying to reschedule since there's nowhere else to go. Without this
|
||||
// branch we incorrectly detach tenants in response to node unavailability.
|
||||
continue;
|
||||
}
|
||||
|
||||
if tenant_shard.intent.demote_attached(scheduler, node_id) {
|
||||
tenant_shard.sequence = tenant_shard.sequence.next();
|
||||
|
||||
@@ -4349,6 +4395,12 @@ impl Service {
|
||||
// When a node comes back online, we must reconcile any tenant that has a None observed
|
||||
// location on the node.
|
||||
for tenant_shard in locked.tenants.values_mut() {
|
||||
// If a reconciliation is already in progress, rely on the previous scheduling
|
||||
// decision and skip triggering a new reconciliation.
|
||||
if tenant_shard.reconciler.is_some() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) {
|
||||
if observed_loc.conf.is_none() {
|
||||
self.maybe_reconcile_shard(tenant_shard, &new_nodes);
|
||||
|
||||
@@ -1914,6 +1914,7 @@ class NeonCli(AbstractNeonCli):
|
||||
endpoint_id: str,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
safekeepers: Optional[List[int]] = None,
|
||||
check_return_code=True,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
args = ["endpoint", "reconfigure", endpoint_id]
|
||||
@@ -1921,6 +1922,8 @@ class NeonCli(AbstractNeonCli):
|
||||
args.extend(["--tenant-id", str(tenant_id)])
|
||||
if pageserver_id is not None:
|
||||
args.extend(["--pageserver-id", str(pageserver_id)])
|
||||
if safekeepers is not None:
|
||||
args.extend(["--safekeepers", (",".join(map(str, safekeepers)))])
|
||||
return self.raw_cli(args, check_return_code=check_return_code)
|
||||
|
||||
def endpoint_stop(
|
||||
@@ -3407,6 +3410,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
self.pg_port = pg_port
|
||||
self.http_port = http_port
|
||||
self.check_stop_result = check_stop_result
|
||||
# passed to endpoint create and endpoint reconfigure
|
||||
self.active_safekeepers: List[int] = list(map(lambda sk: sk.id, env.safekeepers))
|
||||
# path to conf is <repo_dir>/endpoints/<endpoint_id>/pgdata/postgresql.conf
|
||||
|
||||
@@ -3469,6 +3473,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
self,
|
||||
remote_ext_config: Optional[str] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
safekeepers: Optional[List[int]] = None,
|
||||
allow_multiple: bool = False,
|
||||
) -> "Endpoint":
|
||||
"""
|
||||
@@ -3478,6 +3483,11 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
|
||||
assert self.endpoint_id is not None
|
||||
|
||||
# If `safekeepers` is not None, they are remember them as active and use
|
||||
# in the following commands.
|
||||
if safekeepers is not None:
|
||||
self.active_safekeepers = safekeepers
|
||||
|
||||
log.info(f"Starting postgres endpoint {self.endpoint_id}")
|
||||
|
||||
self.env.neon_cli.endpoint_start(
|
||||
@@ -3538,9 +3548,17 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
if self.running:
|
||||
self.safe_psql("SELECT pg_reload_conf()")
|
||||
|
||||
def reconfigure(self, pageserver_id: Optional[int] = None):
|
||||
def reconfigure(
|
||||
self, pageserver_id: Optional[int] = None, safekeepers: Optional[List[int]] = None
|
||||
):
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_reconfigure(self.endpoint_id, self.tenant_id, pageserver_id)
|
||||
# If `safekeepers` is not None, they are remember them as active and use
|
||||
# in the following commands.
|
||||
if safekeepers is not None:
|
||||
self.active_safekeepers = safekeepers
|
||||
self.env.neon_cli.endpoint_reconfigure(
|
||||
self.endpoint_id, self.tenant_id, pageserver_id, self.active_safekeepers
|
||||
)
|
||||
|
||||
def respec(self, **kwargs):
|
||||
"""Update the endpoint.json file used by control_plane."""
|
||||
@@ -3847,7 +3865,15 @@ class Safekeeper(LogUtils):
|
||||
assert isinstance(res, dict)
|
||||
return res
|
||||
|
||||
def http_client(self, auth_token: Optional[str] = None) -> SafekeeperHttpClient:
|
||||
def http_client(
|
||||
self, auth_token: Optional[str] = None, gen_sk_wide_token: bool = True
|
||||
) -> SafekeeperHttpClient:
|
||||
"""
|
||||
When auth_token is None but gen_sk_wide is True creates safekeeper wide
|
||||
token, which is a reasonable default.
|
||||
"""
|
||||
if auth_token is None and gen_sk_wide_token:
|
||||
auth_token = self.env.auth_keys.generate_safekeeper_token()
|
||||
is_testing_enabled = '"testing"' in self.env.get_binary_version("safekeeper")
|
||||
return SafekeeperHttpClient(
|
||||
port=self.port.http, auth_token=auth_token, is_testing_enabled=is_testing_enabled
|
||||
@@ -3897,11 +3923,13 @@ class Safekeeper(LogUtils):
|
||||
segments.sort()
|
||||
return segments
|
||||
|
||||
def checkpoint_up_to(self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn):
|
||||
def checkpoint_up_to(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn, wait_wal_removal=True
|
||||
):
|
||||
"""
|
||||
Assuming pageserver(s) uploaded to s3 up to `lsn`,
|
||||
1) wait for remote_consistent_lsn and wal_backup_lsn on safekeeper to reach it.
|
||||
2) checkpoint timeline on safekeeper, which should remove WAL before this LSN.
|
||||
2) checkpoint timeline on safekeeper, which should remove WAL before this LSN; optionally wait for that.
|
||||
"""
|
||||
cli = self.http_client()
|
||||
|
||||
@@ -3925,7 +3953,8 @@ class Safekeeper(LogUtils):
|
||||
# pageserver to this safekeeper
|
||||
wait_until(30, 1, are_lsns_advanced)
|
||||
cli.checkpoint(tenant_id, timeline_id)
|
||||
wait_until(30, 1, are_segments_removed)
|
||||
if wait_wal_removal:
|
||||
wait_until(30, 1, are_segments_removed)
|
||||
|
||||
def wait_until_paused(self, failpoint: str):
|
||||
msg = f"at failpoint {failpoint}"
|
||||
@@ -4447,6 +4476,7 @@ def wait_for_last_flush_lsn(
|
||||
tenant: TenantId,
|
||||
timeline: TimelineId,
|
||||
pageserver_id: Optional[int] = None,
|
||||
auth_token: Optional[str] = None,
|
||||
) -> Lsn:
|
||||
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
|
||||
|
||||
@@ -4460,7 +4490,7 @@ def wait_for_last_flush_lsn(
|
||||
f"wait_for_last_flush_lsn: waiting for {last_flush_lsn} on shard {tenant_shard_id} on pageserver {pageserver.id})"
|
||||
)
|
||||
waited = wait_for_last_record_lsn(
|
||||
pageserver.http_client(), tenant_shard_id, timeline, last_flush_lsn
|
||||
pageserver.http_client(auth_token=auth_token), tenant_shard_id, timeline, last_flush_lsn
|
||||
)
|
||||
|
||||
assert waited >= last_flush_lsn
|
||||
@@ -4556,6 +4586,7 @@ def last_flush_lsn_upload(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
pageserver_id: Optional[int] = None,
|
||||
auth_token: Optional[str] = None,
|
||||
) -> Lsn:
|
||||
"""
|
||||
Wait for pageserver to catch to the latest flush LSN of given endpoint,
|
||||
@@ -4563,11 +4594,11 @@ def last_flush_lsn_upload(
|
||||
reaching flush LSN).
|
||||
"""
|
||||
last_flush_lsn = wait_for_last_flush_lsn(
|
||||
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver_id
|
||||
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver_id, auth_token=auth_token
|
||||
)
|
||||
shards = tenant_get_shards(env, tenant_id, pageserver_id)
|
||||
for tenant_shard_id, pageserver in shards:
|
||||
ps_http = pageserver.http_client()
|
||||
ps_http = pageserver.http_client(auth_token=auth_token)
|
||||
wait_for_last_record_lsn(ps_http, tenant_shard_id, timeline_id, last_flush_lsn)
|
||||
# force a checkpoint to trigger upload
|
||||
ps_http.timeline_checkpoint(tenant_shard_id, timeline_id)
|
||||
|
||||
@@ -94,8 +94,6 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
|
||||
".*WARN.*path=/v1/utilization .*request was dropped before completing",
|
||||
# Can happen during shutdown
|
||||
".*scheduling deletion on drop failed: queue is in state Stopped.*",
|
||||
# Can happen during shutdown
|
||||
".*ignoring failure to find gc cutoffs: timeline shutting down.*",
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Tuple
|
||||
|
||||
@@ -35,10 +34,6 @@ from performance.pageserver.util import (
|
||||
@pytest.mark.timeout(
|
||||
10000
|
||||
) # TODO: this value is just "a really high number"; have this per instance type
|
||||
@pytest.mark.skipif(
|
||||
os.getenv("CI", "false") == "true",
|
||||
reason="The test if flaky on CI: https://github.com/neondatabase/neon/issues/6724",
|
||||
)
|
||||
def test_pageserver_max_throughput_getpage_at_latest_lsn(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
@@ -91,6 +86,14 @@ def test_pageserver_max_throughput_getpage_at_latest_lsn(
|
||||
n_tenants,
|
||||
setup_wrapper,
|
||||
)
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
# https://github.com/neondatabase/neon/issues/6925
|
||||
# https://github.com/neondatabase/neon/issues/6390
|
||||
# https://github.com/neondatabase/neon/issues/6724
|
||||
r".*query handler for.*pagestream.*failed: unexpected message: CopyFail during COPY.*"
|
||||
)
|
||||
|
||||
run_benchmark_max_throughput_latest_lsn(env, pg_bin, record, duration)
|
||||
|
||||
|
||||
|
||||
15
test_runner/performance/pgvector/halfvec_build.sql
Normal file
15
test_runner/performance/pgvector/halfvec_build.sql
Normal file
@@ -0,0 +1,15 @@
|
||||
DROP TABLE IF EXISTS halfvec_test_table;
|
||||
|
||||
CREATE TABLE halfvec_test_table (
|
||||
_id text NOT NULL,
|
||||
title text,
|
||||
text text,
|
||||
embeddings halfvec(1536),
|
||||
PRIMARY KEY (_id)
|
||||
);
|
||||
|
||||
INSERT INTO halfvec_test_table (_id, title, text, embeddings)
|
||||
SELECT _id, title, text, embeddings::halfvec
|
||||
FROM documents;
|
||||
|
||||
CREATE INDEX documents_half_precision_hnsw_idx ON halfvec_test_table USING hnsw (embeddings halfvec_cosine_ops) WITH (m = 64, ef_construction = 128);
|
||||
@@ -0,0 +1,13 @@
|
||||
-- run with pooled connection
|
||||
-- pgbench -T 300 -c 100 -j20 -f pgbench_halfvec_queries.sql -postgresql://neondb_owner:<secret>@ep-floral-thunder-w1gzhaxi-pooler.eu-west-1.aws.neon.build/neondb?sslmode=require"
|
||||
|
||||
with x (x) as (
|
||||
select "embeddings" as x
|
||||
from halfvec_test_table
|
||||
TABLESAMPLE SYSTEM (1)
|
||||
LIMIT 1
|
||||
)
|
||||
SELECT title, "embeddings" <=> (select x from x) as distance
|
||||
FROM halfvec_test_table
|
||||
ORDER BY 2
|
||||
LIMIT 30;
|
||||
@@ -1,13 +0,0 @@
|
||||
-- run with pooled connection
|
||||
-- pgbench -T 300 -c 100 -j20 -f pgbench_hnsw_queries.sql -postgresql://neondb_owner:<secret>@ep-floral-thunder-w1gzhaxi-pooler.eu-west-1.aws.neon.build/neondb?sslmode=require"
|
||||
|
||||
with x (x) as (
|
||||
select "embeddings" as x
|
||||
from hnsw_test_table
|
||||
TABLESAMPLE SYSTEM (1)
|
||||
LIMIT 1
|
||||
)
|
||||
SELECT title, "embeddings" <=> (select x from x) as distance
|
||||
FROM hnsw_test_table
|
||||
ORDER BY 2
|
||||
LIMIT 30;
|
||||
@@ -106,6 +106,7 @@ QUERIES: Tuple[LabelledQuery, ...] = (
|
||||
# Disable auto formatting for the list of queries so that it's easier to read
|
||||
# fmt: off
|
||||
PGVECTOR_QUERIES: Tuple[LabelledQuery, ...] = (
|
||||
LabelledQuery("PGVPREP", r"ALTER EXTENSION VECTOR UPDATE;"),
|
||||
LabelledQuery("PGV0", r"DROP TABLE IF EXISTS hnsw_test_table;"),
|
||||
LabelledQuery("PGV1", r"CREATE TABLE hnsw_test_table AS TABLE documents WITH NO DATA;"),
|
||||
LabelledQuery("PGV2", r"INSERT INTO hnsw_test_table SELECT * FROM documents;"),
|
||||
@@ -115,6 +116,10 @@ PGVECTOR_QUERIES: Tuple[LabelledQuery, ...] = (
|
||||
LabelledQuery("PGV6", r"CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_l1_ops);"),
|
||||
LabelledQuery("PGV7", r"CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_hamming_ops);"),
|
||||
LabelledQuery("PGV8", r"CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_jaccard_ops);"),
|
||||
LabelledQuery("PGV9", r"DROP TABLE IF EXISTS halfvec_test_table;"),
|
||||
LabelledQuery("PGV10", r"CREATE TABLE halfvec_test_table (_id text NOT NULL, title text, text text, embeddings halfvec(1536), PRIMARY KEY (_id));"),
|
||||
LabelledQuery("PGV11", r"INSERT INTO halfvec_test_table (_id, title, text, embeddings) SELECT _id, title, text, embeddings::halfvec FROM documents;"),
|
||||
LabelledQuery("PGV12", r"CREATE INDEX documents_half_precision_hnsw_idx ON halfvec_test_table USING hnsw (embeddings halfvec_cosine_ops) WITH (m = 64, ef_construction = 128);"),
|
||||
)
|
||||
# fmt: on
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ class PgBenchLoadType(enum.Enum):
|
||||
SIMPLE_UPDATE = "simple-update"
|
||||
SELECT_ONLY = "select-only"
|
||||
PGVECTOR_HNSW = "pgvector-hnsw"
|
||||
PGVECTOR_HALFVEC = "pgvector-halfvec"
|
||||
|
||||
|
||||
def utc_now_timestamp() -> int:
|
||||
@@ -153,6 +154,26 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
|
||||
password=password,
|
||||
)
|
||||
|
||||
if workload_type == PgBenchLoadType.PGVECTOR_HALFVEC:
|
||||
# Run simple-update workload
|
||||
run_pgbench(
|
||||
env,
|
||||
"pgvector-halfvec",
|
||||
[
|
||||
"pgbench",
|
||||
"-f",
|
||||
"test_runner/performance/pgvector/pgbench_custom_script_pgvector_halfvec_queries.sql",
|
||||
"-c100",
|
||||
"-j20",
|
||||
f"-T{duration}",
|
||||
"-P2",
|
||||
"--protocol=prepared",
|
||||
"--progress-timestamp",
|
||||
connstr,
|
||||
],
|
||||
password=password,
|
||||
)
|
||||
|
||||
env.report_size()
|
||||
|
||||
|
||||
@@ -222,13 +243,3 @@ def test_pgbench_remote_simple_update(remote_compare: PgCompare, scale: int, dur
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgbench_remote_select_only(remote_compare: PgCompare, scale: int, duration: int):
|
||||
run_test_pgbench(remote_compare, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
||||
|
||||
|
||||
# The following test runs on an existing database that has pgvector extension installed
|
||||
# and a table with 1 million embedding vectors loaded and indexed with HNSW.
|
||||
#
|
||||
# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup.
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgbench_remote_pgvector(remote_compare: PgCompare, duration: int):
|
||||
run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HNSW)
|
||||
|
||||
24
test_runner/performance/test_perf_pgvector_queries.py
Normal file
24
test_runner/performance/test_perf_pgvector_queries.py
Normal file
@@ -0,0 +1,24 @@
|
||||
import pytest
|
||||
from fixtures.compare_fixtures import PgCompare
|
||||
|
||||
from performance.test_perf_pgbench import PgBenchLoadType, get_durations_matrix, run_test_pgbench
|
||||
|
||||
|
||||
# The following test runs on an existing database that has pgvector extension installed
|
||||
# and a table with 1 million embedding vectors loaded and indexed with HNSW.
|
||||
#
|
||||
# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup.
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgbench_remote_pgvector_hnsw(remote_compare: PgCompare, duration: int):
|
||||
run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HNSW)
|
||||
|
||||
|
||||
# The following test runs on an existing database that has pgvector extension installed
|
||||
# and a table with 1 million embedding vectors loaded and indexed with halfvec.
|
||||
#
|
||||
# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup.
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgbench_remote_pgvector_halfvec(remote_compare: PgCompare, duration: int):
|
||||
run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HALFVEC)
|
||||
@@ -300,7 +300,7 @@ def test_replica_query_race(neon_simple_env: NeonEnv):
|
||||
p_cur.execute("CREATE TABLE test AS SELECT 0 AS counter")
|
||||
|
||||
standby_ep = env.endpoints.new_replica_start(origin=primary_ep, endpoint_id="standby")
|
||||
time.sleep(1)
|
||||
wait_replica_caughtup(primary_ep, standby_ep)
|
||||
|
||||
# In primary, run a lot of UPDATEs on a single page
|
||||
finished = False
|
||||
|
||||
@@ -129,3 +129,33 @@ def test_ondemand_download_replica(neon_env_builder: NeonEnvBuilder, shard_count
|
||||
cur_replica = conn_replica.cursor()
|
||||
cur_replica.execute("SELECT * FROM clogtest")
|
||||
assert cur_replica.fetchall() == [(1,), (3,)]
|
||||
|
||||
|
||||
def test_ondemand_download_after_wal_switch(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test on-demand SLRU download on standby, when starting right after
|
||||
WAL segment switch.
|
||||
|
||||
This is a repro for a bug in how the LSN at WAL page/segment
|
||||
boundary was handled (https://github.com/neondatabase/neon/issues/8030)
|
||||
"""
|
||||
|
||||
tenant_conf = {
|
||||
"lazy_slru_download": "true",
|
||||
}
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
# Create a test table
|
||||
cur.execute("CREATE TABLE clogtest (id integer)")
|
||||
cur.execute("INSERT INTO clogtest VALUES (1)")
|
||||
|
||||
# Start standby at WAL segment boundary
|
||||
cur.execute("SELECT pg_switch_wal()")
|
||||
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_insert_lsn()"))
|
||||
_endpoint_at_lsn = env.endpoints.create_start(
|
||||
branch_name="main", endpoint_id="ep-at-lsn", lsn=lsn
|
||||
)
|
||||
|
||||
@@ -75,9 +75,6 @@ def test_metric_collection(
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*metrics endpoint refused the sent metrics*",
|
||||
# we have a fast rate of calculation, these can happen at shutdown
|
||||
".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*",
|
||||
".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes",
|
||||
".*metrics_collection: failed to upload to S3: Failed to upload data of length .* to storage path.*",
|
||||
]
|
||||
)
|
||||
@@ -238,9 +235,6 @@ def test_metric_collection_cleans_up_tempfile(
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*metrics endpoint refused the sent metrics*",
|
||||
# we have a fast rate of calculation, these can happen at shutdown
|
||||
".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*",
|
||||
".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@@ -133,6 +133,9 @@ def test_storage_controller_smoke(
|
||||
|
||||
wait_until(10, 1, lambda: node_evacuated(env.pageservers[0].id))
|
||||
|
||||
# Let all the reconciliations after marking the node offline complete
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
# Marking pageserver active should not migrate anything to it
|
||||
# immediately
|
||||
env.storage_controller.node_configure(env.pageservers[0].id, {"availability": "Active"})
|
||||
@@ -931,19 +934,27 @@ class Failure:
|
||||
def clear(self, env: NeonEnv):
|
||||
raise NotImplementedError()
|
||||
|
||||
def nodes(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class NodeStop(Failure):
|
||||
def __init__(self, pageserver_id, immediate):
|
||||
self.pageserver_id = pageserver_id
|
||||
def __init__(self, pageserver_ids, immediate):
|
||||
self.pageserver_ids = pageserver_ids
|
||||
self.immediate = immediate
|
||||
|
||||
def apply(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.stop(immediate=self.immediate)
|
||||
for ps_id in self.pageserver_ids:
|
||||
pageserver = env.get_pageserver(ps_id)
|
||||
pageserver.stop(immediate=self.immediate)
|
||||
|
||||
def clear(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.start()
|
||||
for ps_id in self.pageserver_ids:
|
||||
pageserver = env.get_pageserver(ps_id)
|
||||
pageserver.start()
|
||||
|
||||
def nodes(self):
|
||||
return self.pageserver_ids
|
||||
|
||||
|
||||
class PageserverFailpoint(Failure):
|
||||
@@ -959,6 +970,9 @@ class PageserverFailpoint(Failure):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.http_client().configure_failpoints((self.failpoint, "off"))
|
||||
|
||||
def nodes(self):
|
||||
return [self.pageserver_id]
|
||||
|
||||
|
||||
def build_node_to_tenants_map(env: NeonEnv) -> dict[int, list[TenantId]]:
|
||||
tenants = env.storage_controller.tenant_list()
|
||||
@@ -982,8 +996,9 @@ def build_node_to_tenants_map(env: NeonEnv) -> dict[int, list[TenantId]]:
|
||||
@pytest.mark.parametrize(
|
||||
"failure",
|
||||
[
|
||||
NodeStop(pageserver_id=1, immediate=False),
|
||||
NodeStop(pageserver_id=1, immediate=True),
|
||||
NodeStop(pageserver_ids=[1], immediate=False),
|
||||
NodeStop(pageserver_ids=[1], immediate=True),
|
||||
NodeStop(pageserver_ids=[1, 2], immediate=True),
|
||||
PageserverFailpoint(pageserver_id=1, failpoint="get-utilization-http-handler"),
|
||||
],
|
||||
)
|
||||
@@ -1036,33 +1051,50 @@ def test_storage_controller_heartbeats(
|
||||
wait_until(10, 1, tenants_placed)
|
||||
|
||||
# ... then we apply the failure
|
||||
offline_node_id = failure.pageserver_id
|
||||
online_node_id = (set(range(1, len(env.pageservers) + 1)) - {offline_node_id}).pop()
|
||||
env.get_pageserver(offline_node_id).allowed_errors.append(
|
||||
# In the case of the failpoint failure, the impacted pageserver
|
||||
# still believes it has the tenant attached since location
|
||||
# config calls into it will fail due to being marked offline.
|
||||
".*Dropped remote consistent LSN updates.*",
|
||||
)
|
||||
offline_node_ids = set(failure.nodes())
|
||||
online_node_ids = set(range(1, len(env.pageservers) + 1)) - offline_node_ids
|
||||
|
||||
for node_id in offline_node_ids:
|
||||
env.get_pageserver(node_id).allowed_errors.append(
|
||||
# In the case of the failpoint failure, the impacted pageserver
|
||||
# still believes it has the tenant attached since location
|
||||
# config calls into it will fail due to being marked offline.
|
||||
".*Dropped remote consistent LSN updates.*",
|
||||
)
|
||||
|
||||
if len(offline_node_ids) > 1:
|
||||
env.get_pageserver(node_id).allowed_errors.append(
|
||||
".*Scheduling error when marking pageserver.*offline.*",
|
||||
)
|
||||
|
||||
failure.apply(env)
|
||||
|
||||
# ... expecting the heartbeats to mark it offline
|
||||
def node_offline():
|
||||
def nodes_offline():
|
||||
nodes = env.storage_controller.node_list()
|
||||
log.info(f"{nodes=}")
|
||||
target = next(n for n in nodes if n["id"] == offline_node_id)
|
||||
assert target["availability"] == "Offline"
|
||||
for node in nodes:
|
||||
if node["id"] in offline_node_ids:
|
||||
assert node["availability"] == "Offline"
|
||||
|
||||
# A node is considered offline if the last successful heartbeat
|
||||
# was more than 10 seconds ago (hardcoded in the storage controller).
|
||||
wait_until(20, 1, node_offline)
|
||||
wait_until(20, 1, nodes_offline)
|
||||
|
||||
# .. expecting the tenant on the offline node to be migrated
|
||||
def tenant_migrated():
|
||||
if len(online_node_ids) == 0:
|
||||
time.sleep(5)
|
||||
return
|
||||
|
||||
node_to_tenants = build_node_to_tenants_map(env)
|
||||
log.info(f"{node_to_tenants=}")
|
||||
assert set(node_to_tenants[online_node_id]) == set(tenant_ids)
|
||||
|
||||
observed_tenants = set()
|
||||
for node_id in online_node_ids:
|
||||
observed_tenants |= set(node_to_tenants[node_id])
|
||||
|
||||
assert observed_tenants == set(tenant_ids)
|
||||
|
||||
wait_until(10, 1, tenant_migrated)
|
||||
|
||||
@@ -1070,31 +1102,24 @@ def test_storage_controller_heartbeats(
|
||||
failure.clear(env)
|
||||
|
||||
# ... expecting the offline node to become active again
|
||||
def node_online():
|
||||
def nodes_online():
|
||||
nodes = env.storage_controller.node_list()
|
||||
target = next(n for n in nodes if n["id"] == offline_node_id)
|
||||
assert target["availability"] == "Active"
|
||||
for node in nodes:
|
||||
if node["id"] in online_node_ids:
|
||||
assert node["availability"] == "Active"
|
||||
|
||||
wait_until(10, 1, node_online)
|
||||
wait_until(10, 1, nodes_online)
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
# ... then we create a new tenant
|
||||
tid = TenantId.generate()
|
||||
env.storage_controller.tenant_create(tid)
|
||||
|
||||
# ... expecting it to be placed on the node that just came back online
|
||||
tenants = env.storage_controller.tenant_list()
|
||||
newest_tenant = next(t for t in tenants if t["tenant_shard_id"] == str(tid))
|
||||
locations = list(newest_tenant["observed"]["locations"].keys())
|
||||
locations = [int(node_id) for node_id in locations]
|
||||
assert locations == [offline_node_id]
|
||||
node_to_tenants = build_node_to_tenants_map(env)
|
||||
log.info(f"Back online: {node_to_tenants=}")
|
||||
|
||||
# ... expecting the storage controller to reach a consistent state
|
||||
def storage_controller_consistent():
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
wait_until(10, 1, storage_controller_consistent)
|
||||
wait_until(30, 1, storage_controller_consistent)
|
||||
|
||||
|
||||
def test_storage_controller_re_attach(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
@@ -678,10 +678,6 @@ def test_synthetic_size_while_deleting(neon_env_builder: NeonEnvBuilder):
|
||||
with pytest.raises(PageserverApiException, match=matcher):
|
||||
completion.result()
|
||||
|
||||
# this happens on both cases
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*ignoring failure to find gc cutoffs: timeline shutting down.*"
|
||||
)
|
||||
# this happens only in the case of deletion (http response logging)
|
||||
env.pageserver.allowed_errors.append(".*Failed to refresh gc_info before gathering inputs.*")
|
||||
|
||||
|
||||
@@ -317,9 +317,9 @@ def test_broker(neon_env_builder: NeonEnvBuilder):
|
||||
time.sleep(1)
|
||||
|
||||
# Ensure that safekeepers don't lose remote_consistent_lsn on restart.
|
||||
# Control file is persisted each 5s. TODO: do that on shutdown and remove sleep.
|
||||
time.sleep(6)
|
||||
for sk in env.safekeepers:
|
||||
# force persist cfile
|
||||
sk.http_client().checkpoint(tenant_id, timeline_id)
|
||||
sk.stop()
|
||||
sk.start()
|
||||
stat_after_restart = [cli.timeline_status(tenant_id, timeline_id) for cli in clients]
|
||||
@@ -374,7 +374,7 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
http_cli_other = env.safekeepers[0].http_client(
|
||||
auth_token=env.auth_keys.generate_tenant_token(TenantId.generate())
|
||||
)
|
||||
http_cli_noauth = env.safekeepers[0].http_client()
|
||||
http_cli_noauth = env.safekeepers[0].http_client(gen_sk_wide_token=False)
|
||||
|
||||
# Pretend WAL is offloaded to s3.
|
||||
if auth_enabled:
|
||||
@@ -830,7 +830,7 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
auth_token=env.auth_keys.generate_tenant_token(TenantId.generate())
|
||||
)
|
||||
wa_http_cli_bad.check_status()
|
||||
wa_http_cli_noauth = wa.http_client()
|
||||
wa_http_cli_noauth = wa.http_client(gen_sk_wide_token=False)
|
||||
wa_http_cli_noauth.check_status()
|
||||
|
||||
# debug endpoint requires safekeeper scope
|
||||
@@ -964,7 +964,7 @@ def test_sk_auth(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# By default, neon_local enables auth on all services if auth is configured,
|
||||
# so http must require the token.
|
||||
sk_http_cli_noauth = sk.http_client()
|
||||
sk_http_cli_noauth = sk.http_client(gen_sk_wide_token=False)
|
||||
sk_http_cli_auth = sk.http_client(auth_token=env.auth_keys.generate_tenant_token(tenant_id))
|
||||
with pytest.raises(sk_http_cli_noauth.HTTPError, match="Forbidden|Unauthorized"):
|
||||
sk_http_cli_noauth.timeline_status(tenant_id, timeline_id)
|
||||
@@ -1640,7 +1640,7 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
sk_http_other = sk.http_client(
|
||||
auth_token=env.auth_keys.generate_tenant_token(tenant_id_other)
|
||||
)
|
||||
sk_http_noauth = sk.http_client()
|
||||
sk_http_noauth = sk.http_client(gen_sk_wide_token=False)
|
||||
assert (sk_data_dir / str(tenant_id) / str(timeline_id_1)).is_dir()
|
||||
assert (sk_data_dir / str(tenant_id) / str(timeline_id_2)).is_dir()
|
||||
assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).is_dir()
|
||||
@@ -1723,7 +1723,13 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
cur.execute("INSERT INTO t (key) VALUES (123)")
|
||||
|
||||
|
||||
def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
# Basic pull_timeline test.
|
||||
# When live_sk_change is False, compute is restarted to change set of
|
||||
# safekeepers; otherwise it is live reload.
|
||||
@pytest.mark.parametrize("live_sk_change", [False, True])
|
||||
def test_pull_timeline(neon_env_builder: NeonEnvBuilder, live_sk_change: bool):
|
||||
neon_env_builder.auth_enabled = True
|
||||
|
||||
def execute_payload(endpoint: Endpoint):
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
@@ -1739,7 +1745,7 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
def show_statuses(safekeepers: List[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId):
|
||||
for sk in safekeepers:
|
||||
http_cli = sk.http_client()
|
||||
http_cli = sk.http_client(auth_token=env.auth_keys.generate_tenant_token(tenant_id))
|
||||
try:
|
||||
status = http_cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(f"Safekeeper {sk.id} status: {status}")
|
||||
@@ -1749,13 +1755,12 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 4
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.neon_cli.create_branch("test_pull_timeline")
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
log.info("Use only first 3 safekeepers")
|
||||
env.safekeepers[3].stop()
|
||||
endpoint = env.endpoints.create("test_pull_timeline")
|
||||
endpoint.active_safekeepers = [1, 2, 3]
|
||||
endpoint.start()
|
||||
endpoint = env.endpoints.create("main")
|
||||
endpoint.start(safekeepers=[1, 2, 3])
|
||||
|
||||
execute_payload(endpoint)
|
||||
show_statuses(env.safekeepers, tenant_id, timeline_id)
|
||||
@@ -1767,29 +1772,22 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
log.info("Initialize new safekeeper 4, pull data from 1 & 3")
|
||||
env.safekeepers[3].start()
|
||||
|
||||
res = (
|
||||
env.safekeepers[3]
|
||||
.http_client()
|
||||
.pull_timeline(
|
||||
{
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
"http_hosts": [
|
||||
f"http://localhost:{env.safekeepers[0].port.http}",
|
||||
f"http://localhost:{env.safekeepers[2].port.http}",
|
||||
],
|
||||
}
|
||||
)
|
||||
res = env.safekeepers[3].pull_timeline(
|
||||
[env.safekeepers[0], env.safekeepers[2]], tenant_id, timeline_id
|
||||
)
|
||||
log.info("Finished pulling timeline")
|
||||
log.info(res)
|
||||
|
||||
show_statuses(env.safekeepers, tenant_id, timeline_id)
|
||||
|
||||
log.info("Restarting compute with new config to verify that it works")
|
||||
endpoint.stop_and_destroy().create("test_pull_timeline")
|
||||
endpoint.active_safekeepers = [1, 3, 4]
|
||||
endpoint.start()
|
||||
action = "reconfiguing" if live_sk_change else "restarting"
|
||||
log.info(f"{action} compute with new config to verify that it works")
|
||||
new_sks = [1, 3, 4]
|
||||
if not live_sk_change:
|
||||
endpoint.stop_and_destroy().create("main")
|
||||
endpoint.start(safekeepers=new_sks)
|
||||
else:
|
||||
endpoint.reconfigure(safekeepers=new_sks)
|
||||
|
||||
execute_payload(endpoint)
|
||||
show_statuses(env.safekeepers, tenant_id, timeline_id)
|
||||
@@ -1816,8 +1814,8 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
# 4) Do some write, verify integrity with timeline_digest.
|
||||
# Expected to fail while holding off WAL gc plus fetching commit_lsn WAL
|
||||
# segment is not implemented.
|
||||
@pytest.mark.xfail
|
||||
def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.auth_enabled = True
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
|
||||
env = neon_env_builder.init_start()
|
||||
@@ -1836,27 +1834,36 @@ def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder):
|
||||
src_flush_lsn = src_sk.get_flush_lsn(tenant_id, timeline_id)
|
||||
log.info(f"flush_lsn on src before pull_timeline: {src_flush_lsn}")
|
||||
|
||||
dst_http = dst_sk.http_client()
|
||||
src_http = src_sk.http_client()
|
||||
# run pull_timeline which will halt before downloading files
|
||||
dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "pause"))
|
||||
src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause"))
|
||||
pt_handle = PropagatingThread(
|
||||
target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id)
|
||||
)
|
||||
pt_handle.start()
|
||||
dst_sk.wait_until_paused("sk-pull-timeline-after-list-pausable")
|
||||
src_sk.wait_until_paused("sk-snapshot-after-list-pausable")
|
||||
|
||||
# ensure segment exists
|
||||
endpoint.safe_psql("insert into t select generate_series(1, 180000), 'papaya'")
|
||||
lsn = last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
|
||||
lsn = last_flush_lsn_upload(
|
||||
env,
|
||||
endpoint,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
auth_token=env.auth_keys.generate_tenant_token(tenant_id),
|
||||
)
|
||||
assert lsn > Lsn("0/2000000")
|
||||
# Checkpoint timeline beyond lsn.
|
||||
src_sk.checkpoint_up_to(tenant_id, timeline_id, lsn)
|
||||
src_sk.checkpoint_up_to(tenant_id, timeline_id, lsn, wait_wal_removal=False)
|
||||
first_segment_p = src_sk.timeline_dir(tenant_id, timeline_id) / "000000010000000000000001"
|
||||
log.info(f"first segment exist={os.path.exists(first_segment_p)}")
|
||||
|
||||
dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "off"))
|
||||
src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "off"))
|
||||
pt_handle.join()
|
||||
|
||||
# after pull_timeline is finished WAL should be removed on donor
|
||||
src_sk.checkpoint_up_to(tenant_id, timeline_id, lsn, wait_wal_removal=True)
|
||||
|
||||
timeline_start_lsn = src_sk.get_timeline_start_lsn(tenant_id, timeline_id)
|
||||
dst_flush_lsn = dst_sk.get_flush_lsn(tenant_id, timeline_id)
|
||||
log.info(f"flush_lsn on dst after pull_timeline: {dst_flush_lsn}")
|
||||
@@ -1883,8 +1890,8 @@ def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder):
|
||||
# enough, so it won't be affected by term change anymore.
|
||||
#
|
||||
# Expected to fail while term check is not implemented.
|
||||
@pytest.mark.xfail
|
||||
def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.auth_enabled = True
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
|
||||
env = neon_env_builder.init_start()
|
||||
@@ -1900,14 +1907,14 @@ def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder):
|
||||
ep.safe_psql("create table t(key int, value text)")
|
||||
ep.safe_psql("insert into t select generate_series(1, 1000), 'pear'")
|
||||
|
||||
dst_http = dst_sk.http_client()
|
||||
src_http = src_sk.http_client()
|
||||
# run pull_timeline which will halt before downloading files
|
||||
dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "pause"))
|
||||
src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause"))
|
||||
pt_handle = PropagatingThread(
|
||||
target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id)
|
||||
)
|
||||
pt_handle.start()
|
||||
dst_sk.wait_until_paused("sk-pull-timeline-after-list-pausable")
|
||||
src_sk.wait_until_paused("sk-snapshot-after-list-pausable")
|
||||
|
||||
src_http = src_sk.http_client()
|
||||
term_before = src_http.timeline_status(tenant_id, timeline_id).term
|
||||
@@ -1922,7 +1929,7 @@ def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder):
|
||||
term_after = src_http.timeline_status(tenant_id, timeline_id).term
|
||||
assert term_after > term_before, f"term_after={term_after}, term_before={term_before}"
|
||||
|
||||
dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "off"))
|
||||
src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "off"))
|
||||
with pytest.raises(requests.exceptions.HTTPError):
|
||||
pt_handle.join()
|
||||
|
||||
|
||||
@@ -601,13 +601,16 @@ async def run_segment_init_failure(env: NeonEnv):
|
||||
conn = await ep.connect_async()
|
||||
ep.safe_psql("select pg_switch_wal()") # jump to the segment boundary
|
||||
# next insertion should hang until failpoint is disabled.
|
||||
asyncio.create_task(conn.execute("insert into t select generate_series(1,1), 'payload'"))
|
||||
bg_query = asyncio.create_task(
|
||||
conn.execute("insert into t select generate_series(1,1), 'payload'")
|
||||
)
|
||||
sleep_sec = 2
|
||||
await asyncio.sleep(sleep_sec)
|
||||
# also restart ep at segment boundary to make test more interesting
|
||||
ep.stop()
|
||||
# it must still be not finished
|
||||
# assert not bg_query.done()
|
||||
assert not bg_query.done()
|
||||
# Also restart ep at segment boundary to make test more interesting. Do it in immediate mode;
|
||||
# fast will hang because it will try to gracefully finish sending WAL.
|
||||
ep.stop(mode="immediate")
|
||||
# Without segment rename during init (#6402) previous statement created
|
||||
# partially initialized 16MB segment, so sk restart also triggers #6401.
|
||||
sk.stop().start()
|
||||
|
||||
@@ -18,7 +18,7 @@ commands:
|
||||
- name: postgres-exporter
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres" /bin/postgres_exporter'
|
||||
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter" /bin/postgres_exporter'
|
||||
- name: sql-exporter
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
@@ -93,7 +93,7 @@ files:
|
||||
target:
|
||||
# Data source name always has a URI schema that matches the driver name. In some cases (e.g. MySQL)
|
||||
# the schema gets dropped or replaced to match the driver expected DSN format.
|
||||
data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable'
|
||||
data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter'
|
||||
|
||||
# Collectors (referenced by name) to execute on the target.
|
||||
# Glob patterns are supported (see <https://pkg.go.dev/path/filepath#Match> for syntax).
|
||||
@@ -128,7 +128,7 @@ files:
|
||||
target:
|
||||
# Data source name always has a URI schema that matches the driver name. In some cases (e.g. MySQL)
|
||||
# the schema gets dropped or replaced to match the driver expected DSN format.
|
||||
data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable'
|
||||
data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter_autoscaling'
|
||||
|
||||
# Collectors (referenced by name) to execute on the target.
|
||||
# Glob patterns are supported (see <https://pkg.go.dev/path/filepath#Match> for syntax).
|
||||
@@ -304,7 +304,9 @@ files:
|
||||
- slot_name
|
||||
values: [restart_lsn]
|
||||
query: |
|
||||
select slot_name, (restart_lsn - '0/0')::FLOAT8 from pg_replication_slots where slot_type = 'logical';
|
||||
select slot_name, (restart_lsn - '0/0')::FLOAT8 as restart_lsn
|
||||
from pg_replication_slots
|
||||
where slot_type = 'logical';
|
||||
|
||||
- metric_name: retained_wal
|
||||
type: gauge
|
||||
@@ -322,14 +324,15 @@ files:
|
||||
help: 'Whether or not the replication slot wal_status is lost'
|
||||
key_labels:
|
||||
- slot_name
|
||||
values: [wal_status_is_lost]
|
||||
values: [wal_is_lost]
|
||||
query: |
|
||||
SELECT slot_name,
|
||||
CASE
|
||||
WHEN wal_status = 'lost' THEN 1
|
||||
ELSE 0
|
||||
END AS wal_status_is_lost
|
||||
END AS wal_is_lost
|
||||
FROM pg_replication_slots;
|
||||
|
||||
- filename: neon_collector_autoscaling.yml
|
||||
content: |
|
||||
collector_name: neon_collector_autoscaling
|
||||
|
||||
Reference in New Issue
Block a user