Compare commits

..

11 Commits

Author SHA1 Message Date
Christian Schwarz
a1063ead56 run just the flaky test in CI, ten times 2024-06-12 21:15:42 +02:00
Christian Schwarz
246c4c5ccf minimum amount of work 2024-06-12 21:15:28 +02:00
Christian Schwarz
e2d57fb61b Revert "make sure the getpage requests happens"
This reverts commit a62b2e3e76.
2024-06-12 21:11:43 +02:00
Christian Schwarz
a62b2e3e76 make sure the getpage requests happens 2024-06-12 21:11:33 +02:00
Christian Schwarz
4a190986b4 test_vm_bit_clear_on_heap_lock_blackbox: dump layer map while flaky code is running
refs https://github.com/neondatabase/neon/issues/6967
2024-06-12 21:09:35 +02:00
Christian Schwarz
17c51ad63a Merge remote-tracking branch 'origin/main' into bayandin/preserve_database_files-on-CI 2024-06-12 08:25:39 +02:00
Alexander Bayandin
29a2524788 A little cleanup 2024-06-10 11:30:19 +01:00
Alexander Bayandin
35ff8fc6c1 Explain preserve_database_files propogation a bit more
Co-authored-by: Christian Schwarz <christian@neon.tech>
2024-06-10 11:26:29 +01:00
Alexander Bayandin
184547634e CI(test_runner): do not preserve_database_files for test_sharding 2024-06-10 11:14:27 +01:00
Alexander Bayandin
070136e07e CI(test_runner): preserve_database_files for test_vm_bit_clear_on_heap_lock 2024-06-10 11:14:27 +01:00
Alexander Bayandin
7d9ea26530 CI(test_runner): Upload all test artifacts if preserve_database_files is enabled 2024-06-10 11:14:27 +01:00
126 changed files with 1587 additions and 3805 deletions

View File

@@ -99,7 +99,7 @@ jobs:
# Set --sparse-ordering option of pytest-order plugin
# to ensure tests are running in order of appears in the file.
# It's important for test_perf_pgbench.py::test_pgbench_remote_* tests
extra_params: -m remote_cluster --sparse-ordering --timeout 5400 --ignore test_runner/performance/test_perf_olap.py --ignore test_runner/performance/test_perf_pgvector_queries.py
extra_params: -m remote_cluster --sparse-ordering --timeout 5400 --ignore test_runner/performance/test_perf_olap.py
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -410,14 +410,14 @@ jobs:
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
- name: Benchmark pgvector queries
- name: Benchmark pgvector hnsw queries
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance/test_perf_pgvector_queries.py
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_pgvector
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"

View File

@@ -30,6 +30,7 @@ jobs:
check-image:
uses: ./.github/workflows/check-build-tools-image.yml
# This job uses older version of GitHub Actions because it's run on gen2 runners, which don't support node 20 (for newer versions)
build-image:
needs: [ check-image ]
if: needs.check-image.outputs.found == 'false'
@@ -54,7 +55,7 @@ jobs:
exit 1
fi
- uses: actions/checkout@v4
- uses: actions/checkout@v3
# Use custom DOCKER_CONFIG directory to avoid conflicts with default settings
# The default value is ~/.docker

View File

@@ -299,21 +299,21 @@ jobs:
uses: actions/cache@v4
with:
path: pg_install/v14
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v15 build
id: cache_pg_15
uses: actions/cache@v4
with:
path: pg_install/v15
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v16 build
id: cache_pg_16
uses: actions/cache@v4
with:
path: pg_install/v16
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Build postgres v14
if: steps.cache_pg_14.outputs.cache-hit != 'true'
@@ -337,8 +337,34 @@ jobs:
run: |
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests
# Do install *before* running rust tests because they might recompile the
# binaries with different features/flags.
- name: Run rust tests
env:
NEXTEST_RETRIES: 3
run: |
#nextest does not yet support running doctests
cargo test --doc $CARGO_FLAGS $CARGO_FEATURES
for io_engine in std-fs tokio-epoll-uring ; do
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES
done
# Run separate tests for real S3
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
export REMOTE_STORAGE_S3_BUCKET=neon-github-ci-tests
export REMOTE_STORAGE_S3_REGION=eu-central-1
# Avoid `$CARGO_FEATURES` since there's no `testing` feature in the e2e tests now
${cov_prefix} cargo nextest run $CARGO_FLAGS -E 'package(remote_storage)' -E 'test(test_real_s3)'
# Run separate tests for real Azure Blob Storage
# XXX: replace region with `eu-central-1`-like region
export ENABLE_REAL_AZURE_REMOTE_STORAGE=y
export AZURE_STORAGE_ACCOUNT="${{ secrets.AZURE_STORAGE_ACCOUNT_DEV }}"
export AZURE_STORAGE_ACCESS_KEY="${{ secrets.AZURE_STORAGE_ACCESS_KEY_DEV }}"
export REMOTE_STORAGE_AZURE_CONTAINER="${{ vars.REMOTE_STORAGE_AZURE_CONTAINER }}"
export REMOTE_STORAGE_AZURE_REGION="${{ vars.REMOTE_STORAGE_AZURE_REGION }}"
# Avoid `$CARGO_FEATURES` since there's no `testing` feature in the e2e tests now
${cov_prefix} cargo nextest run $CARGO_FLAGS -E 'package(remote_storage)' -E 'test(test_real_azure)'
- name: Install rust binaries
run: |
# Install target binaries
@@ -379,32 +405,6 @@ jobs:
done
fi
- name: Run rust tests
env:
NEXTEST_RETRIES: 3
run: |
#nextest does not yet support running doctests
cargo test --doc $CARGO_FLAGS $CARGO_FEATURES
for io_engine in std-fs tokio-epoll-uring ; do
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES
done
# Run separate tests for real S3
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
export REMOTE_STORAGE_S3_BUCKET=neon-github-ci-tests
export REMOTE_STORAGE_S3_REGION=eu-central-1
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(remote_storage)' -E 'test(test_real_s3)'
# Run separate tests for real Azure Blob Storage
# XXX: replace region with `eu-central-1`-like region
export ENABLE_REAL_AZURE_REMOTE_STORAGE=y
export AZURE_STORAGE_ACCOUNT="${{ secrets.AZURE_STORAGE_ACCOUNT_DEV }}"
export AZURE_STORAGE_ACCESS_KEY="${{ secrets.AZURE_STORAGE_ACCESS_KEY_DEV }}"
export REMOTE_STORAGE_AZURE_CONTAINER="${{ vars.REMOTE_STORAGE_AZURE_CONTAINER }}"
export REMOTE_STORAGE_AZURE_REGION="${{ vars.REMOTE_STORAGE_AZURE_REGION }}"
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(remote_storage)' -E 'test(test_real_azure)'
- name: Install postgres binaries
run: cp -a pg_install /tmp/neon/pg_install
@@ -447,6 +447,7 @@ jobs:
with:
build_type: ${{ matrix.build_type }}
test_selection: regress
extra_params: -k test_vm_bit_clear_on_heap_lock_blackbox
needs_postgres_source: true
run_with_real_s3: true
real_s3_bucket: neon-github-ci-tests
@@ -965,7 +966,7 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v1
with:
fetch-depth: 0
@@ -1070,8 +1071,7 @@ jobs:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
- name: Login to dev ECR
uses: docker/login-action@v3
- uses: docker/login-action@v3
with:
registry: 369495373322.dkr.ecr.eu-central-1.amazonaws.com
username: ${{ secrets.AWS_ACCESS_KEY_DEV }}
@@ -1102,24 +1102,6 @@ jobs:
$repo/vm-compute-node-${version}:${{ needs.tag.outputs.build-tag }}
done
done
docker buildx imagetools create -t neondatabase/neon-test-extensions-v16:latest \
neondatabase/neon-test-extensions-v16:${{ needs.tag.outputs.build-tag }}
- name: Login to prod ECR
uses: docker/login-action@v3
if: github.ref_name == 'release'|| github.ref_name == 'release-proxy'
with:
registry: 093970136003.dkr.ecr.eu-central-1.amazonaws.com
username: ${{ secrets.PROD_GHA_RUNNER_LIMITED_AWS_ACCESS_KEY_ID }}
password: ${{ secrets.PROD_GHA_RUNNER_LIMITED_AWS_SECRET_ACCESS_KEY }}
- name: Copy all images to prod ECR
if: github.ref_name == 'release'|| github.ref_name == 'release-proxy'
run: |
for image in neon compute-tools {vm-,}compute-node-{v14,v15,v16}; do
docker buildx imagetools create -t 093970136003.dkr.ecr.eu-central-1.amazonaws.com/${image}:${{ needs.tag.outputs.build-tag }} \
369495373322.dkr.ecr.eu-central-1.amazonaws.com/${image}:${{ needs.tag.outputs.build-tag }}
done
trigger-custom-extensions-build-and-wait:
needs: [ check-permissions, tag ]

View File

@@ -25,17 +25,26 @@ jobs:
found: ${{ steps.check-image.outputs.found }}
steps:
- uses: actions/checkout@v4
- name: Get build-tools image tag for the current commit
id: get-build-tools-tag
env:
IMAGE_TAG: |
${{ hashFiles('Dockerfile.build-tools',
'.github/workflows/check-build-tools-image.yml',
'.github/workflows/build-build-tools-image.yml') }}
# Usually, for COMMIT_SHA, we use `github.event.pull_request.head.sha || github.sha`, but here, even for PRs,
# we want to use `github.sha` i.e. point to a phantom merge commit to determine the image tag correctly.
COMMIT_SHA: ${{ github.sha }}
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
echo "image-tag=${IMAGE_TAG}" | tee -a $GITHUB_OUTPUT
LAST_BUILD_TOOLS_SHA=$(
gh api \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" \
--method GET \
--field path=Dockerfile.build-tools \
--field sha=${COMMIT_SHA} \
--field per_page=1 \
--jq ".[0].sha" \
"/repos/${GITHUB_REPOSITORY}/commits"
)
echo "image-tag=${LAST_BUILD_TOOLS_SHA}" | tee -a $GITHUB_OUTPUT
- name: Check if such tag found in the registry
id: check-image

5
Cargo.lock generated
View File

@@ -1014,9 +1014,6 @@ name = "camino"
version = "1.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c59e92b5a388f549b863a7bea62612c09f24c8393560709a54558a9abdfb3b9c"
dependencies = [
"serde",
]
[[package]]
name = "camino-tempfile"
@@ -5161,7 +5158,6 @@ dependencies = [
"tokio-io-timeout",
"tokio-postgres",
"tokio-stream",
"tokio-tar",
"tokio-util",
"toml_edit",
"tracing",
@@ -5757,7 +5753,6 @@ dependencies = [
"r2d2",
"reqwest 0.12.4",
"routerify",
"scopeguard",
"serde",
"serde_json",
"strum",

View File

@@ -120,7 +120,7 @@ num_cpus = "1.15"
num-traits = "0.2.15"
once_cell = "1.13"
opentelemetry = "0.20.0"
opentelemetry-otlp = { version = "0.13.0", default-features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-otlp = { version = "0.13.0", default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-semantic-conventions = "0.12.0"
parking_lot = "0.12"
parquet = { version = "51.0.0", default-features = false, features = ["zstd"] }
@@ -128,7 +128,7 @@ parquet_derive = "51.0.0"
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
pin-project-lite = "0.2"
procfs = "0.14"
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
prometheus = {version = "0.13", default_features=false, features = ["process"]} # removes protobuf dependency
prost = "0.11"
rand = "0.8"
redis = { version = "0.25.2", features = ["tokio-rustls-comp", "keep-alive"] }
@@ -184,7 +184,7 @@ tower-service = "0.3.2"
tracing = "0.1"
tracing-error = "0.2.0"
tracing-opentelemetry = "0.21.0"
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json", "ansi"] }
tracing-subscriber = { version = "0.3", default_features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json", "ansi"] }
twox-hash = { version = "1.6.3", default-features = false }
url = "2.2"
urlencoding = "2.1"

View File

@@ -69,6 +69,8 @@ RUN set -e \
&& apt install -y \
libreadline-dev \
libseccomp-dev \
libicu67 \
openssl \
ca-certificates \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \
&& useradd -d /data neon \

View File

@@ -112,45 +112,6 @@ RUN for package in Capture::Tiny DateTime Devel::Cover Digest::MD5 File::Spec JS
&& make install \
&& rm -rf ../lcov.tar.gz
# Compile and install the static OpenSSL library
ENV OPENSSL_VERSION=3.2.2
ENV OPENSSL_PREFIX=/usr/local/openssl
RUN wget -O /tmp/openssl-${OPENSSL_VERSION}.tar.gz https://www.openssl.org/source/openssl-${OPENSSL_VERSION}.tar.gz && \
echo "197149c18d9e9f292c43f0400acaba12e5f52cacfe050f3d199277ea738ec2e7 /tmp/openssl-${OPENSSL_VERSION}.tar.gz" | sha256sum --check && \
cd /tmp && \
tar xzvf /tmp/openssl-${OPENSSL_VERSION}.tar.gz && \
rm /tmp/openssl-${OPENSSL_VERSION}.tar.gz && \
cd /tmp/openssl-${OPENSSL_VERSION} && \
./config --prefix=${OPENSSL_PREFIX} -static --static no-shared -fPIC && \
make -j "$(nproc)" && \
make install && \
cd /tmp && \
rm -rf /tmp/openssl-${OPENSSL_VERSION}
# Use the same version of libicu as the compute nodes so that
# clusters created using inidb on pageserver can be used by computes.
#
# TODO: at this time, Dockerfile.compute-node uses the debian bullseye libicu
# package, which is 67.1. We're duplicating that knowledge here, and also, technically,
# Debian has a few patches on top of 67.1 that we're not adding here.
ENV ICU_VERSION=67.1
ENV ICU_PREFIX=/usr/local/icu
# Download and build static ICU
RUN wget -O /tmp/libicu-${ICU_VERSION}.tgz https://github.com/unicode-org/icu/releases/download/release-${ICU_VERSION//./-}/icu4c-${ICU_VERSION//./_}-src.tgz && \
echo "94a80cd6f251a53bd2a997f6f1b5ac6653fe791dfab66e1eb0227740fb86d5dc /tmp/libicu-${ICU_VERSION}.tgz" | sha256sum --check && \
mkdir /tmp/icu && \
pushd /tmp/icu && \
tar -xzf /tmp/libicu-${ICU_VERSION}.tgz && \
pushd icu/source && \
./configure --prefix=${ICU_PREFIX} --enable-static --enable-shared=no CXXFLAGS="-fPIC" CFLAGS="-fPIC" && \
make -j "$(nproc)" && \
make install && \
popd && \
rm -rf icu && \
rm -f /tmp/libicu-${ICU_VERSION}.tgz && \
popd
# Switch to nonroot user
USER nonroot:nonroot
WORKDIR /home/nonroot
@@ -180,7 +141,7 @@ WORKDIR /home/nonroot
# Rust
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
ENV RUSTC_VERSION=1.79.0
ENV RUSTC_VERSION=1.78.0
ENV RUSTUP_HOME="/home/nonroot/.rustup"
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \
@@ -209,6 +170,3 @@ RUN whoami \
&& rustup --version --verbose \
&& rustc --version --verbose \
&& clang --version
# Set following flag to check in Makefile if its running in Docker
RUN touch /home/nonroot/.docker_build

View File

@@ -246,8 +246,8 @@ COPY patches/pgvector.patch /pgvector.patch
# By default, pgvector Makefile uses `-march=native`. We don't want that,
# because we build the images on different machines than where we run them.
# Pass OPTFLAGS="" to remove it.
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.7.2.tar.gz -O pgvector.tar.gz && \
echo "617fba855c9bcb41a2a9bc78a78567fd2e147c72afd5bf9d37b31b9591632b30 pgvector.tar.gz" | sha256sum --check && \
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.7.1.tar.gz -O pgvector.tar.gz && \
echo "fe6c8cb4e0cd1a8cb60f5badf9e1701e0fcabcfc260931c26d01e155c4dd21d1 pgvector.tar.gz" | sha256sum --check && \
mkdir pgvector-src && cd pgvector-src && tar xzf ../pgvector.tar.gz --strip-components=1 -C . && \
patch -p1 < /pgvector.patch && \
make -j $(getconf _NPROCESSORS_ONLN) OPTFLAGS="" PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -979,7 +979,7 @@ RUN cd /ext-src/ && for f in *.tar.gz; \
do echo $f; dname=$(echo $f | sed 's/\.tar.*//')-src; \
rm -rf $dname; mkdir $dname; tar xzf $f --strip-components=1 -C $dname \
|| exit 1; rm -f $f; done
RUN cd /ext-src/pgvector-src && patch -p1 <../pgvector.patch
RUN cd /ext-src/pgvector-src && patch -p1 <../pgvector.patch
# cmake is required for the h3 test
RUN apt-get update && apt-get install -y cmake
RUN patch -p1 < /ext-src/pg_hintplan.patch

View File

@@ -3,9 +3,6 @@ ROOT_PROJECT_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
# Where to install Postgres, default is ./pg_install, maybe useful for package managers
POSTGRES_INSTALL_DIR ?= $(ROOT_PROJECT_DIR)/pg_install/
OPENSSL_PREFIX_DIR := /usr/local/openssl
ICU_PREFIX_DIR := /usr/local/icu
#
# We differentiate between release / debug build types using the BUILD_TYPE
# environment variable.
@@ -23,16 +20,6 @@ else
$(error Bad build type '$(BUILD_TYPE)', see Makefile for options)
endif
ifeq ($(shell test -e /home/nonroot/.docker_build && echo -n yes),yes)
# Exclude static build openssl, icu for local build (MacOS, Linux)
# Only keep for build type release and debug
PG_CFLAGS += -I$(OPENSSL_PREFIX_DIR)/include
PG_CONFIGURE_OPTS += --with-icu
PG_CONFIGURE_OPTS += ICU_CFLAGS='-I/$(ICU_PREFIX_DIR)/include -DU_STATIC_IMPLEMENTATION'
PG_CONFIGURE_OPTS += ICU_LIBS='-L$(ICU_PREFIX_DIR)/lib -L$(ICU_PREFIX_DIR)/lib64 -licui18n -licuuc -licudata -lstdc++ -Wl,-Bdynamic -lm'
PG_CONFIGURE_OPTS += LDFLAGS='-L$(OPENSSL_PREFIX_DIR)/lib -L$(OPENSSL_PREFIX_DIR)/lib64 -L$(ICU_PREFIX_DIR)/lib -L$(ICU_PREFIX_DIR)/lib64 -Wl,-Bstatic -lssl -lcrypto -Wl,-Bdynamic -lrt -lm -ldl -lpthread'
endif
UNAME_S := $(shell uname -s)
ifeq ($(UNAME_S),Linux)
# Seccomp BPF is only available for Linux
@@ -41,7 +28,7 @@ else ifeq ($(UNAME_S),Darwin)
ifndef DISABLE_HOMEBREW
# macOS with brew-installed openssl requires explicit paths
# It can be configured with OPENSSL_PREFIX variable
OPENSSL_PREFIX := $(shell brew --prefix openssl@3)
OPENSSL_PREFIX ?= $(shell brew --prefix openssl@3)
PG_CONFIGURE_OPTS += --with-includes=$(OPENSSL_PREFIX)/include --with-libraries=$(OPENSSL_PREFIX)/lib
PG_CONFIGURE_OPTS += PKG_CONFIG_PATH=$(shell brew --prefix icu4c)/lib/pkgconfig
# macOS already has bison and flex in the system, but they are old and result in postgres-v14 target failure

View File

@@ -735,7 +735,7 @@ fn cli() -> clap::Command {
Arg::new("filecache-connstr")
.long("filecache-connstr")
.default_value(
"host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable application_name=vm-monitor",
"host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable",
)
.value_name("FILECACHE_CONNSTR"),
)

View File

@@ -918,39 +918,38 @@ impl ComputeNode {
// temporarily reset max_cluster_size in config
// to avoid the possibility of hitting the limit, while we are reconfiguring:
// creating new extensions, roles, etc...
config::with_compute_ctl_tmp_override(pgdata_path, "neon.max_cluster_size=-1", || {
self.pg_reload_conf()?;
config::compute_ctl_temp_override_create(pgdata_path, "neon.max_cluster_size=-1")?;
self.pg_reload_conf()?;
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
// Proceed with post-startup configuration. Note, that order of operations is important.
// Disable DDL forwarding because control plane already knows about these roles/databases.
if spec.mode == ComputeMode::Primary {
client.simple_query("SET neon.forward_ddl = false")?;
cleanup_instance(&mut client)?;
handle_roles(&spec, &mut client)?;
handle_databases(&spec, &mut client)?;
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
handle_grants(
&spec,
&mut client,
self.connstr.as_str(),
self.has_feature(ComputeFeature::AnonExtension),
)?;
handle_extensions(&spec, &mut client)?;
handle_extension_neon(&mut client)?;
// We can skip handle_migrations here because a new migration can only appear
// if we have a new version of the compute_ctl binary, which can only happen
// if compute got restarted, in which case we'll end up inside of apply_config
// instead of reconfigure.
}
// Proceed with post-startup configuration. Note, that order of operations is important.
// Disable DDL forwarding because control plane already knows about these roles/databases.
if spec.mode == ComputeMode::Primary {
client.simple_query("SET neon.forward_ddl = false")?;
cleanup_instance(&mut client)?;
handle_roles(&spec, &mut client)?;
handle_databases(&spec, &mut client)?;
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
handle_grants(
&spec,
&mut client,
self.connstr.as_str(),
self.has_feature(ComputeFeature::AnonExtension),
)?;
handle_extensions(&spec, &mut client)?;
handle_extension_neon(&mut client)?;
// We can skip handle_migrations here because a new migration can only appear
// if we have a new version of the compute_ctl binary, which can only happen
// if compute got restarted, in which case we'll end up inside of apply_config
// instead of reconfigure.
}
// 'Close' connection
drop(client);
Ok(())
})?;
// 'Close' connection
drop(client);
// reset max_cluster_size in config back to original value and reload config
config::compute_ctl_temp_override_remove(pgdata_path)?;
self.pg_reload_conf()?;
let unknown_op = "unknown".to_string();
@@ -1041,17 +1040,12 @@ impl ComputeNode {
// temporarily reset max_cluster_size in config
// to avoid the possibility of hitting the limit, while we are applying config:
// creating new extensions, roles, etc...
config::with_compute_ctl_tmp_override(
pgdata_path,
"neon.max_cluster_size=-1",
|| {
self.pg_reload_conf()?;
config::compute_ctl_temp_override_create(pgdata_path, "neon.max_cluster_size=-1")?;
self.pg_reload_conf()?;
self.apply_config(&compute_state)?;
self.apply_config(&compute_state)?;
Ok(())
},
)?;
config::compute_ctl_temp_override_remove(pgdata_path)?;
self.pg_reload_conf()?;
}
self.post_apply_config()?;

View File

@@ -131,17 +131,18 @@ pub fn write_postgres_conf(
Ok(())
}
pub fn with_compute_ctl_tmp_override<F>(pgdata_path: &Path, options: &str, exec: F) -> Result<()>
where
F: FnOnce() -> Result<()>,
{
/// create file compute_ctl_temp_override.conf in pgdata_dir
/// add provided options to this file
pub fn compute_ctl_temp_override_create(pgdata_path: &Path, options: &str) -> Result<()> {
let path = pgdata_path.join("compute_ctl_temp_override.conf");
let mut file = File::create(path)?;
write!(file, "{}", options)?;
let res = exec();
file.set_len(0)?;
res
Ok(())
}
/// remove file compute_ctl_temp_override.conf in pgdata_dir
pub fn compute_ctl_temp_override_remove(pgdata_path: &Path) -> Result<()> {
let path = pgdata_path.join("compute_ctl_temp_override.conf");
std::fs::remove_file(path)?;
Ok(())
}

View File

@@ -17,7 +17,7 @@ use hyper::header::CONTENT_TYPE;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use tokio::task;
use tracing::{debug, error, info, warn};
use tracing::{error, info, warn};
use tracing_utils::http::OtelName;
use utils::http::request::must_get_query_param;
@@ -48,7 +48,7 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
match (req.method(), req.uri().path()) {
// Serialized compute state.
(&Method::GET, "/status") => {
debug!("serving /status GET request");
info!("serving /status GET request");
let state = compute.state.lock().unwrap();
let status_response = status_response_from_state(&state);
Response::new(Body::from(serde_json::to_string(&status_response).unwrap()))

View File

@@ -4,6 +4,10 @@ version = "0.1.0"
edition.workspace = true
license.workspace = true
[features]
default = []
testing = [ "pageserver_api/testing" ]
[dependencies]
anyhow.workspace = true
async-trait.workspace = true

View File

@@ -69,9 +69,6 @@ where
// Not generic AsRef<OsStr>, otherwise empty `envs` prevents type inference
EI: IntoIterator<Item = (String, String)>,
{
if !datadir.metadata().context("stat datadir")?.is_dir() {
anyhow::bail!("`datadir` must be a directory when calling this function: {datadir:?}");
}
let log_path = datadir.join(format!("{process_name}.log"));
let process_log_file = fs::OpenOptions::new()
.create(true)
@@ -88,13 +85,7 @@ where
let background_command = command
.stdout(process_log_file)
.stderr(same_file_for_stderr)
.args(args)
// spawn all child processes in their datadir, useful for all kinds of things,
// not least cleaning up child processes e.g. after an unclean exit from the test suite:
// ```
// lsof -d cwd -a +D Users/cs/src/neon/test_output
// ```
.current_dir(datadir);
.args(args);
let filled_cmd = fill_env_vars_prefixed_neon(fill_remote_storage_secrets_vars(
fill_rust_env_vars(background_command),

View File

@@ -87,8 +87,7 @@ fn main() -> Result<()> {
handle_init(sub_args).map(Some)
} else {
// all other commands need an existing config
let mut env =
LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?;
let mut env = LocalEnv::load_config().context("Error loading config")?;
let original_env = env.clone();
let rt = tokio::runtime::Builder::new_current_thread()
@@ -365,8 +364,7 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
LocalEnv::init(init_conf, force)
.context("materialize initial neon_local environment on disk")?;
Ok(LocalEnv::load_config(&local_env::base_path())
.expect("freshly written config should be loadable"))
Ok(LocalEnv::load_config().expect("freshly written config should be loadable"))
}
/// The default pageserver is the one where CLI tenant/timeline operations are sent by default.

View File

@@ -42,8 +42,8 @@ pub struct LocalEnv {
// compute endpoints).
//
// This is not stored in the config file. Rather, this is the path where the
// config file itself is. It is read from the NEON_REPO_DIR env variable which
// must be an absolute path. If the env var is not set, $PWD/.neon is used.
// config file itself is. It is read from the NEON_REPO_DIR env variable or
// '.neon' if not given.
pub base_data_dir: PathBuf,
// Path to postgres distribution. It's expected that "bin", "include",
@@ -431,7 +431,9 @@ impl LocalEnv {
}
/// Construct `Self` from on-disk state.
pub fn load_config(repopath: &Path) -> anyhow::Result<Self> {
pub fn load_config() -> anyhow::Result<Self> {
let repopath = base_path();
if !repopath.exists() {
bail!(
"Neon config is not found in {}. You need to run 'neon_local init' first",
@@ -459,7 +461,7 @@ impl LocalEnv {
branch_name_mappings,
} = on_disk_config;
LocalEnv {
base_data_dir: repopath.to_owned(),
base_data_dir: repopath.clone(),
pg_distrib_dir,
neon_distrib_dir,
default_tenant_id,
@@ -480,7 +482,7 @@ impl LocalEnv {
"we ensure this during deserialization"
);
env.pageservers = {
let iter = std::fs::read_dir(repopath).context("open dir")?;
let iter = std::fs::read_dir(&repopath).context("open dir")?;
let mut pageservers = Vec::new();
for res in iter {
let dentry = res?;
@@ -717,25 +719,10 @@ impl LocalEnv {
}
pub fn base_path() -> PathBuf {
let path = match std::env::var_os("NEON_REPO_DIR") {
Some(val) => {
let path = PathBuf::from(val);
if !path.is_absolute() {
// repeat the env var in the error because our default is always absolute
panic!("NEON_REPO_DIR must be an absolute path, got {path:?}");
}
path
}
None => {
let pwd = std::env::current_dir()
// technically this can fail but it's quite unlikeley
.expect("determine current directory");
let pwd_abs = pwd.canonicalize().expect("canonicalize current directory");
pwd_abs.join(".neon")
}
};
assert!(path.is_absolute());
path
match std::env::var_os("NEON_REPO_DIR") {
Some(val) => PathBuf::from(val),
None => PathBuf::from(".neon"),
}
}
/// Generate a public/private key pair for JWT authentication

View File

@@ -383,10 +383,12 @@ impl PageServerNode {
.map(|x| x.parse::<AuxFilePolicy>())
.transpose()
.context("Failed to parse 'switch_aux_file_policy'")?,
lsn_lease_length: settings.remove("lsn_lease_length").map(|x| x.to_string()),
lsn_lease_length_for_ts: settings
.remove("lsn_lease_length_for_ts")
.map(|x| x.to_string()),
#[cfg(feature = "testing")]
test_vm_bit_debug_logging: settings
.remove("test_vm_bit_debug_logging")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'test_vm_bit_debug_logging' as bool")?,
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")
@@ -510,10 +512,12 @@ impl PageServerNode {
.map(|x| x.parse::<AuxFilePolicy>())
.transpose()
.context("Failed to parse 'switch_aux_file_policy'")?,
lsn_lease_length: settings.remove("lsn_lease_length").map(|x| x.to_string()),
lsn_lease_length_for_ts: settings
.remove("lsn_lease_length_for_ts")
.map(|x| x.to_string()),
#[cfg(feature = "testing")]
test_vm_bit_debug_logging: settings
.remove("test_vm_bit_debug_logging")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'test_vm_bit_debug_logging' as bool")?,
}
};

View File

@@ -14,7 +14,6 @@ use camino::Utf8PathBuf;
use postgres_connection::PgConnectionConfig;
use reqwest::{IntoUrl, Method};
use thiserror::Error;
use utils::auth::{Claims, Scope};
use utils::{http::error::HttpErrorBody, id::NodeId};
use crate::{
@@ -198,7 +197,7 @@ impl SafekeeperNode {
&datadir,
&self.env.safekeeper_bin(),
&args,
self.safekeeper_env_variables()?,
[],
background_process::InitialPidFile::Expect(self.pid_file()),
|| async {
match self.check_status().await {
@@ -211,18 +210,6 @@ impl SafekeeperNode {
.await
}
fn safekeeper_env_variables(&self) -> anyhow::Result<Vec<(String, String)>> {
// Generate a token to connect from safekeeper to peers
if self.conf.auth_enabled {
let token = self
.env
.generate_auth_token(&Claims::new(None, Scope::SafekeeperData))?;
Ok(vec![("SAFEKEEPER_AUTH_TOKEN".to_owned(), token)])
} else {
Ok(Vec::new())
}
}
///
/// Stop the server.
///

View File

@@ -46,7 +46,6 @@ const STORAGE_CONTROLLER_POSTGRES_VERSION: u32 = 16;
pub struct AttachHookRequest {
pub tenant_shard_id: TenantShardId,
pub node_id: Option<NodeId>,
pub generation_override: Option<i32>,
}
#[derive(Serialize, Deserialize)]
@@ -314,17 +313,15 @@ impl StorageController {
args.push(format!("--split-threshold={split_threshold}"))
}
args.push(format!(
"--neon-local-repo-dir={}",
self.env.base_data_dir.display()
));
background_process::start_process(
COMMAND,
&self.env.base_data_dir,
&self.env.storage_controller_bin(),
args,
[],
[(
"NEON_REPO_DIR".to_string(),
self.env.base_data_dir.to_string_lossy().to_string(),
)],
background_process::InitialPidFile::Create(self.pid_file()),
|| async {
match self.ready().await {
@@ -443,7 +440,6 @@ impl StorageController {
let request = AttachHookRequest {
tenant_shard_id,
node_id: Some(pageserver_id),
generation_override: None,
};
let response = self

View File

@@ -786,15 +786,18 @@ async fn main() -> anyhow::Result<()> {
anyhow::bail!("Drain requested for node which doesn't exist.")
}
node_to_fill_descs.retain(|desc| {
matches!(desc.availability, NodeAvailabilityWrapper::Active)
&& matches!(
desc.scheduling,
NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Filling
)
});
let can_fill = node_to_fill_descs
.iter()
.filter(|desc| {
matches!(desc.availability, NodeAvailabilityWrapper::Active)
&& matches!(
desc.scheduling,
NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Filling
)
})
.any(|_| true);
if node_to_fill_descs.is_empty() {
if !can_fill {
anyhow::bail!("There are no nodes to drain to")
}

View File

@@ -194,7 +194,6 @@ services:
- compute
neon-test-extensions:
profiles: ["test-extensions"]
image: ${REPOSITORY:-neondatabase}/neon-test-extensions-v${PG_TEST_VERSION:-16}:${TAG:-latest}
entrypoint:
- "/bin/bash"

View File

@@ -15,6 +15,7 @@ set -eux -o pipefail
COMPOSE_FILE='docker-compose.yml'
cd $(dirname $0)
docker compose -f $COMPOSE_FILE
COMPUTE_CONTAINER_NAME=docker-compose-compute-1
TEST_CONTAINER_NAME=docker-compose-neon-test-extensions-1
PSQL_OPTION="-h localhost -U cloud_admin -p 55433 -d postgres"
@@ -25,16 +26,16 @@ export http_proxy https_proxy
cleanup() {
echo "show container information"
docker ps
docker compose --profile test-extensions -f $COMPOSE_FILE logs
docker compose -f $COMPOSE_FILE logs
echo "stop containers..."
docker compose --profile test-extensions -f $COMPOSE_FILE down
docker compose -f $COMPOSE_FILE down
}
for pg_version in 14 15 16; do
echo "clean up containers if exists"
cleanup
PG_TEST_VERSION=$(($pg_version < 16 ? 16 : $pg_version))
PG_VERSION=$pg_version PG_TEST_VERSION=$PG_TEST_VERSION docker compose --profile test-extensions -f $COMPOSE_FILE up --build -d
PG_VERSION=$pg_version PG_TEST_VERSION=$PG_TEST_VERSION docker compose -f $COMPOSE_FILE up --build -d
echo "wait until the compute is ready. timeout after 60s. "
cnt=0
@@ -46,7 +47,7 @@ for pg_version in 14 15 16; do
cleanup
exit 1
fi
if docker compose --profile test-extensions -f $COMPOSE_FILE logs "compute_is_ready" | grep -q "accepting connections"; then
if docker compose -f $COMPOSE_FILE logs "compute_is_ready" | grep -q "accepting connections"; then
echo "OK. The compute is ready to connect."
echo "execute simple queries."
docker exec $COMPUTE_CONTAINER_NAME /bin/bash -c "psql $PSQL_OPTION"

View File

@@ -11,28 +11,15 @@ page server. We currently use the same binary for both, with --wal-redo runtime
the WAL redo mode. Some PostgreSQL changes are needed in the compute node, while others are just for
the WAL redo process.
In addition to core PostgreSQL changes, there is a Neon extension in the pgxn/neon directory that
hooks into the smgr interface, and rmgr extension in pgxn/neon_rmgr. The extensions are loaded into
the Postgres processes with shared_preload_libraries. Most of the Neon-specific code is in the
extensions, and for any new features, that is preferred over modifying core PostgreSQL code.
In addition to core PostgreSQL changes, there is a Neon extension in contrib/neon, to hook into the
smgr interface. Once all the core changes have been submitted to upstream or eliminated some other
way, the extension could live outside the postgres repository and build against vanilla PostgreSQL.
Below is a list of all the PostgreSQL source code changes, categorized into changes needed for
compute, and changes needed for the WAL redo process:
# Changes for Compute node
## Prefetching
There are changes in many places to perform prefetching, for example for sequential scans. Neon
doesn't benefit from OS readahead, and the latency to pageservers is quite high compared to local
disk, so prefetching is critical for performance, also for sequential scans.
### How to get rid of the patch
Upcoming "streaming read" work in v17 might simplify this. And async I/O work in v18 will hopefully
do more.
## Add t_cid to heap WAL records
```
@@ -50,11 +37,54 @@ The problem is that the XLOG_HEAP_INSERT record does not include the command id
Bite the bullet and submit the patch to PostgreSQL, to add the t_cid to the WAL records. It makes the WAL records larger, which could make this unpopular in the PostgreSQL community. However, it might simplify some logical decoding code; Andres Freund briefly mentioned in PGCon 2022 discussion on Heikki's Neon presentation that logical decoding currently needs to jump through some hoops to reconstruct the same information.
Update from Heikki (2024-04-17): I tried to write an upstream patch for that, to use the t_cid field for logical decoding, but it was not as straightforward as it first sounded.
### Alternatives
Perhaps we could write an extra WAL record with the t_cid information, when a page is evicted that contains rows that were touched a transaction that's still running. However, that seems very complicated.
## ginfast.c
```
diff --git a/src/backend/access/gin/ginfast.c b/src/backend/access/gin/ginfast.c
index e0d9940946..2d964c02e9 100644
--- a/src/backend/access/gin/ginfast.c
+++ b/src/backend/access/gin/ginfast.c
@@ -285,6 +285,17 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
memset(&sublist, 0, sizeof(GinMetaPageData));
makeSublist(index, collector->tuples, collector->ntuples, &sublist);
+ if (metadata->head != InvalidBlockNumber)
+ {
+ /*
+ * ZENITH: Get buffer before XLogBeginInsert() to avoid recursive call
+ * of XLogBeginInsert(). Reading a new buffer might evict a dirty page from
+ * the buffer cache, and if that page happens to be an FSM or VM page, zenith_write()
+ * will try to WAL-log an image of the page.
+ */
+ buffer = ReadBuffer(index, metadata->tail);
+ }
+
if (needWal)
XLogBeginInsert();
@@ -316,7 +327,6 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
data.prevTail = metadata->tail;
data.newRightlink = sublist.head;
- buffer = ReadBuffer(index, metadata->tail);
LockBuffer(buffer, GIN_EXCLUSIVE);
page = BufferGetPage(buffer);
```
The problem is explained in the comment above
### How to get rid of the patch
Can we stop WAL-logging FSM or VM pages? Or delay the WAL logging until we're out of the critical
section or something.
Maybe some bigger rewrite of FSM and VM would help to avoid WAL-logging FSM and VM page images?
## Mark index builds that use buffer manager without logging explicitly
```
@@ -65,8 +95,6 @@ Perhaps we could write an extra WAL record with the t_cid information, when a pa
also some changes in src/backend/storage/smgr/smgr.c
```
pgvector 0.6.0 also needs a similar change, which would be very nice to get rid of too.
When a GIN index is built, for example, it is built by inserting the entries into the index more or
less normally, but without WAL-logging anything. After the index has been built, we iterate through
all pages and write them to the WAL. That doesn't work for Neon, because if a page is not WAL-logged
@@ -81,10 +109,6 @@ an operation: `smgr_start_unlogged_build`, `smgr_finish_unlogged_build_phase_1`
I think it would make sense to be more explicit about that in PostgreSQL too. So extract these
changes to a patch and post to pgsql-hackers.
Perhaps we could deduce that an unlogged index build has started when we see a page being evicted
with zero LSN. How to be sure it's an unlogged index build rather than a bug? Currently we have a
check for that and PANIC if we see page with zero LSN being evicted. And how do we detect when the
index build has finished? See https://github.com/neondatabase/neon/pull/7440 for an attempt at that.
## Track last-written page LSN
@@ -116,6 +140,57 @@ The old method is still available, though.
Wait until v15?
## Cache relation sizes
The Neon extension contains a little cache for smgrnblocks() and smgrexists() calls, to avoid going
to the page server every time. It might be useful to cache those in PostgreSQL, maybe in the
relcache? (I think we do cache nblocks in relcache already, check why that's not good enough for
Neon)
## Use buffer manager when extending VM or FSM
```
src/backend/storage/freespace/freespace.c | 14 +-
src/backend/access/heap/visibilitymap.c | 15 +-
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c
index e198df65d8..addfe93eac 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -652,10 +652,19 @@ vm_extend(Relation rel, BlockNumber vm_nblocks)
/* Now extend the file */
while (vm_nblocks_now < vm_nblocks)
{
- PageSetChecksumInplace((Page) pg.data, vm_nblocks_now);
+ /*
+ * ZENITH: Initialize VM pages through buffer cache to prevent loading
+ * them from pageserver.
+ */
+ Buffer buffer = ReadBufferExtended(rel, VISIBILITYMAP_FORKNUM, P_NEW,
+ RBM_ZERO_AND_LOCK, NULL);
+ Page page = BufferGetPage(buffer);
+
+ PageInit((Page) page, BLCKSZ, 0);
+ PageSetChecksumInplace(page, vm_nblocks_now);
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
- smgrextend(rel->rd_smgr, VISIBILITYMAP_FORKNUM, vm_nblocks_now,
- pg.data, false);
vm_nblocks_now++;
}
```
### Problem we're trying to solve
???
### How to get rid of the patch
Maybe this would be a reasonable change in PostgreSQL too?
## Allow startup without reading checkpoint record
In Neon, the compute node is stateless. So when we are launching compute node, we need to provide
@@ -156,7 +231,7 @@ index 0415df9ccb..9f9db3c8bc 100644
* crash we can lose (skip over) as many values as we pre-logged.
*/
-#define SEQ_LOG_VALS 32
+/* Neon XXX: to ensure sequence order of sequence in Zenith we need to WAL log each sequence update. */
+/* Zenith XXX: to ensure sequence order of sequence in Zenith we need to WAL log each sequence update. */
+/* #define SEQ_LOG_VALS 32 */
+#define SEQ_LOG_VALS 0
```
@@ -175,6 +250,66 @@ would be weird if the sequence moved backwards though, think of PITR.
Or add a GUC for the amount to prefix to PostgreSQL, and force it to 1 in Neon.
## Walproposer
```
src/Makefile | 1 +
src/backend/replication/libpqwalproposer/Makefile | 37 +
src/backend/replication/libpqwalproposer/libpqwalproposer.c | 416 ++++++++++++
src/backend/postmaster/bgworker.c | 4 +
src/backend/postmaster/postmaster.c | 6 +
src/backend/replication/Makefile | 4 +-
src/backend/replication/walproposer.c | 2350 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/backend/replication/walproposer_utils.c | 402 +++++++++++
src/backend/replication/walreceiver.c | 7 +
src/backend/replication/walsender.c | 320 ++++++---
src/backend/storage/ipc/ipci.c | 6 +
src/include/replication/walproposer.h | 565 ++++++++++++++++
```
WAL proposer is communicating with safekeeper and ensures WAL durability by quorum writes. It is
currently implemented as patch to standard WAL sender.
### How to get rid of the patch
Refactor into an extension. Submit hooks or APIs into upstream if necessary.
@MMeent did some work on this already: https://github.com/neondatabase/postgres/pull/96
## Ignore unexpected data beyond EOF in bufmgr.c
```
@@ -922,11 +928,14 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
*/
bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
if (!PageIsNew((Page) bufBlock))
- ereport(ERROR,
+ {
+ // XXX-ZENITH
+ MemSet((char *) bufBlock, 0, BLCKSZ);
+ ereport(DEBUG1,
(errmsg("unexpected data beyond EOF in block %u of relation %s",
blockNum, relpath(smgr->smgr_rnode, forkNum)),
errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
-
+ }
/*
* We *must* do smgrextend before succeeding, else the page will not
* be reserved by the kernel, and the next P_NEW call will decide to
```
PostgreSQL is a bit sloppy with extending relations. Usually, the relation is extended with zeros
first, then the page is filled, and finally the new page WAL-logged. But if multiple backends extend
a relation at the same time, the pages can be WAL-logged in different order.
I'm not sure what scenario exactly required this change in Neon, though.
### How to get rid of the patch
Submit patches to pgsql-hackers, to tighten up the WAL-logging around relation extension. It's a bit
confusing even in PostgreSQL. Maybe WAL log the intention to extend first, then extend the relation,
and finally WAL-log that the extension succeeded.
## Make smgr interface available to extensions
```
@@ -186,8 +321,6 @@ Or add a GUC for the amount to prefix to PostgreSQL, and force it to 1 in Neon.
Submit to upstream. This could be useful for the Disk Encryption patches too, or for compression.
We have submitted this to upstream, but it's moving at glacial a speed.
https://commitfest.postgresql.org/47/4428/
## Added relpersistence argument to smgropen()
@@ -311,148 +444,6 @@ Ignore it. This is only needed for disaster recovery, so once we've eliminated a
patches, we can just keep it around as a patch or as separate branch in a repo.
## pg_waldump flags to ignore errors
After creating a new project or branch in Neon, the first timeline can begin in the middle of a WAL segment. pg_waldump chokes on that, so we added some flags to make it possible to ignore errors.
### How to get rid of the patch
Like previous one, ignore it.
## Backpressure if pageserver doesn't ingest WAL fast enough
```
@@ -3200,6 +3202,7 @@ ProcessInterrupts(void)
return;
InterruptPending = false;
+retry:
if (ProcDiePending)
{
ProcDiePending = false;
@@ -3447,6 +3450,13 @@ ProcessInterrupts(void)
if (ParallelApplyMessagePending)
HandleParallelApplyMessages();
+
+ /* Call registered callback if any */
+ if (ProcessInterruptsCallback)
+ {
+ if (ProcessInterruptsCallback())
+ goto retry;
+ }
}
```
### How to get rid of the patch
Submit a patch to upstream, for a hook in ProcessInterrupts. Could be useful for other extensions
too.
## SLRU on-demand download
```
src/backend/access/transam/slru.c | 105 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------
1 file changed, 92 insertions(+), 13 deletions(-)
```
### Problem we're trying to solve
Previously, SLRU files were included in the basebackup, but the total size of them can be large,
several GB, and downloading them all made the startup time too long.
### Alternatives
FUSE hook or LD_PRELOAD trick to intercept the reads on SLRU files
## WAL-log an all-zeros page as one large hole
- In XLogRecordAssemble()
### Problem we're trying to solve
This change was made in v16. Starting with v16, when PostgreSQL extends a relation, it first extends
it with zeros, and it can extend the relation more than one block at a time. The all-zeros page is WAL-ogged, but it's very wasteful to include 8 kB of zeros in the WAL for that. This hack was made so that we WAL logged a compact record with a whole-page "hole". However, PostgreSQL has assertions that prevent that such WAL records from being replayed, so this breaks compatibility such that unmodified PostreSQL cannot process Neon-generated WAL.
### How to get rid of the patch
Find another compact representation for a full-page image of an all-zeros page. A compressed image perhaps.
## Shut down walproposer after checkpointer
```
+ /* Neon: Also allow walproposer background worker to be treated like a WAL sender, so that it's shut down last */
+ if ((bp->bkend_type == BACKEND_TYPE_NORMAL || bp->bkend_type == BACKEND_TYPE_BGWORKER) &&
```
This changes was needed so that postmaster shuts down the walproposer process only after the shutdown checkpoint record is written. Otherwise, the shutdown record will never make it to the safekeepers.
### How to get rid of the patch
Do a bigger refactoring of the postmaster state machine, such that a background worker can specify
the shutdown ordering by itself. The postmaster state machine has grown pretty complicated, and
would benefit from a refactoring for the sake of readability anyway.
## EXPLAIN changes for prefetch and LFC
### How to get rid of the patch
Konstantin submitted a patch to -hackers already: https://commitfest.postgresql.org/47/4643/. Get that into a committable state.
## On-demand download of extensions
### How to get rid of the patch
FUSE or LD_PRELOAD trickery to intercept reads?
## Publication superuser checks
We have hacked CreatePublication so that also neon_superuser can create them.
### How to get rid of the patch
Create an upstream patch with more fine-grained privileges for publications CREATE/DROP that can be GRANTed to users.
## WAL log replication slots
### How to get rid of the patch
Utilize the upcoming v17 "slot sync worker", or a similar neon-specific background worker process, to periodically WAL-log the slots, or to export them somewhere else.
## WAL-log replication snapshots
### How to get rid of the patch
WAL-log them periodically, from a backgound worker.
## WAL-log relmapper files
Similarly to replications snapshot files, the CID mapping files generated during VACUUM FULL of a catalog table are WAL-logged
### How to get rid of the patch
WAL-log them periodically, from a backgound worker.
## XLogWaitForReplayOf()
??
# Not currently committed but proposed
## Disable ring buffer buffer manager strategies
@@ -481,10 +472,23 @@ hint bits are set. Wal logging hint bits updates requires FPI which significantl
Add special WAL record for setting page hints.
## Prefetching
### Why?
As far as pages in Neon are loaded on demand, to reduce node startup time
and also speedup some massive queries we need some mechanism for bulk loading to
reduce page request round-trip overhead.
Currently Postgres is supporting prefetching only for bitmap scan.
In Neon we should also use prefetch for sequential and index scans, because the OS is not doing it for us.
For sequential scan we could prefetch some number of following pages. For index scan we could prefetch pages
of heap relation addressed by TIDs.
## Prewarming
### Why?
Short downtime (or, in other words, fast compute node restart time) is one of the key feature of Neon.
Short downtime (or, in other words, fast compute node restart time) is one of the key feature of Zenith.
But overhead of request-response round-trip for loading pages on demand can make started node warm-up quite slow.
We can capture state of compute node buffer cache and send bulk request for this pages at startup.

View File

@@ -4,18 +4,18 @@
Currently we build two main images:
- [neondatabase/neon](https://hub.docker.com/repository/docker/neondatabase/neon) — image with pre-built `pageserver`, `safekeeper` and `proxy` binaries and all the required runtime dependencies. Built from [/Dockerfile](/Dockerfile).
- [neondatabase/compute-node-v16](https://hub.docker.com/repository/docker/neondatabase/compute-node-v16) — compute node image with pre-built Postgres binaries from [neondatabase/postgres](https://github.com/neondatabase/postgres). Similar images exist for v15 and v14.
- [neondatabase/neon](https://hub.docker.com/repository/docker/zenithdb/zenith) — image with pre-built `pageserver`, `safekeeper` and `proxy` binaries and all the required runtime dependencies. Built from [/Dockerfile](/Dockerfile).
- [neondatabase/compute-node](https://hub.docker.com/repository/docker/zenithdb/compute-node) — compute node image with pre-built Postgres binaries from [neondatabase/postgres](https://github.com/neondatabase/postgres).
And additional intermediate image:
- [neondatabase/compute-tools](https://hub.docker.com/repository/docker/neondatabase/compute-tools) — compute node configuration management tools.
## Build pipeline
## Building pipeline
We build all images after a successful `release` tests run and push automatically to Docker Hub with two parallel CI jobs
1. `neondatabase/compute-tools` and `neondatabase/compute-node-v16` (and -v15 and -v14)
1. `neondatabase/compute-tools` and `neondatabase/compute-node`
2. `neondatabase/neon`
@@ -34,12 +34,12 @@ You can see a [docker compose](https://docs.docker.com/compose/) example to crea
1. create containers
You can specify version of neon cluster using following environment values.
- PG_VERSION: postgres version for compute (default is 16 as of this writing)
- TAG: the tag version of [docker image](https://registry.hub.docker.com/r/neondatabase/neon/tags), which is tagged in [CI test](/.github/workflows/build_and_test.yml). Default is 'latest'
- PG_VERSION: postgres version for compute (default is 16)
- TAG: the tag version of [docker image](https://registry.hub.docker.com/r/neondatabase/neon/tags) (default is latest), which is tagged in [CI test](/.github/workflows/build_and_test.yml)
```
$ cd docker-compose/
$ docker-compose down # remove the containers if exists
$ PG_VERSION=16 TAG=latest docker-compose up --build -d # You can specify the postgres and image version
$ PG_VERSION=16 TAG=2937 docker-compose up --build -d # You can specify the postgres and image version
Creating network "dockercompose_default" with the default driver
Creating docker-compose_storage_broker_1 ... done
(...omit...)
@@ -47,31 +47,29 @@ Creating docker-compose_storage_broker_1 ... done
2. connect compute node
```
$ psql postgresql://cloud_admin:cloud_admin@localhost:55433/postgres
psql (16.3)
Type "help" for help.
$ echo "localhost:55433:postgres:cloud_admin:cloud_admin" >> ~/.pgpass
$ chmod 600 ~/.pgpass
$ psql -h localhost -p 55433 -U cloud_admin
postgres=# CREATE TABLE t(key int primary key, value text);
CREATE TABLE
postgres=# insert into t values(1, 1);
postgres=# insert into t values(1,1);
INSERT 0 1
postgres=# select * from t;
key | value
key | value
-----+-------
1 | 1
(1 row)
```
3. If you want to see the log, you can use `docker-compose logs` command.
```
# check the container name you want to see
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
3582f6d76227 docker-compose_compute "/shell/compute.sh" 2 minutes ago Up 2 minutes 0.0.0.0:3080->3080/tcp, :::3080->3080/tcp, 0.0.0.0:55433->55433/tcp, :::55433->55433/tcp docker-compose_compute_1
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
d6968a5ae912 dockercompose_compute "/shell/compute.sh" 5 minutes ago Up 5 minutes 0.0.0.0:3080->3080/tcp, 0.0.0.0:55433->55433/tcp dockercompose_compute_1
(...omit...)
$ docker logs -f docker-compose_compute_1
$ docker logs -f dockercompose_compute_1
2022-10-21 06:15:48.757 GMT [56] LOG: connection authorized: user=cloud_admin database=postgres application_name=psql
2022-10-21 06:17:00.307 GMT [56] LOG: [NEON_SMGR] libpagestore: connected to 'host=pageserver port=6400'
(...omit...)

View File

@@ -101,12 +101,11 @@ or
```toml
[remote_storage]
container_name = 'some-container-name'
storage_account = 'somestorageaccnt'
container_region = 'us-east'
prefix_in_container = '/test-prefix/'
```
The `AZURE_STORAGE_ACCESS_KEY` env variable can be used to specify the azure credentials if needed.
`AZURE_STORAGE_ACCOUNT` and `AZURE_STORAGE_ACCESS_KEY` env variables can be used to specify the azure credentials if needed.
## Repository background tasks

View File

@@ -4,6 +4,10 @@ version = "0.1.0"
edition.workspace = true
license.workspace = true
[features]
default = []
testing = []
[dependencies]
serde.workspace = true
serde_with.workspace = true

View File

@@ -209,7 +209,6 @@ pub enum NodeSchedulingPolicy {
Active,
Filling,
Pause,
PauseForRestart,
Draining,
}
@@ -221,7 +220,6 @@ impl FromStr for NodeSchedulingPolicy {
"active" => Ok(Self::Active),
"filling" => Ok(Self::Filling),
"pause" => Ok(Self::Pause),
"pause_for_restart" => Ok(Self::PauseForRestart),
"draining" => Ok(Self::Draining),
_ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
}
@@ -235,7 +233,6 @@ impl From<NodeSchedulingPolicy> for String {
Active => "active",
Filling => "filling",
Pause => "pause",
PauseForRestart => "pause_for_restart",
Draining => "draining",
}
.to_string()

View File

@@ -558,12 +558,6 @@ impl KeySpaceRandomAccum {
self.ranges.push(range);
}
pub fn add_keyspace(&mut self, keyspace: KeySpace) {
for range in keyspace.ranges {
self.add_range(range);
}
}
pub fn to_keyspace(mut self) -> KeySpace {
let mut ranges = Vec::new();
if !self.ranges.is_empty() {

View File

@@ -177,20 +177,6 @@ serde_with::serde_conv!(
|value: String| -> Result<_, humantime::TimestampError> { humantime::parse_rfc3339(&value) }
);
impl LsnLease {
/// The default length for an explicit LSN lease request (10 minutes).
pub const DEFAULT_LENGTH: Duration = Duration::from_secs(10 * 60);
/// The default length for an implicit LSN lease granted during
/// `get_lsn_by_timestamp` request (1 minutes).
pub const DEFAULT_LENGTH_FOR_TS: Duration = Duration::from_secs(60);
/// Checks whether the lease is expired.
pub fn is_expired(&self, now: &SystemTime) -> bool {
now > &self.valid_until
}
}
/// The only [`TenantState`] variants we could be `TenantState::Activating` from.
#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum ActivatingFrom {
@@ -336,8 +322,8 @@ pub struct TenantConfig {
pub timeline_get_throttle: Option<ThrottleConfig>,
pub image_layer_creation_check_threshold: Option<u8>,
pub switch_aux_file_policy: Option<AuxFilePolicy>,
pub lsn_lease_length: Option<String>,
pub lsn_lease_length_for_ts: Option<String>,
#[cfg(feature = "testing")]
pub test_vm_bit_debug_logging: Option<bool>,
}
/// The policy for the aux file storage. It can be switched through `switch_aux_file_policy`

View File

@@ -14,7 +14,7 @@ aws-config.workspace = true
aws-sdk-s3.workspace = true
aws-credential-types.workspace = true
bytes.workspace = true
camino = { workspace = true, features = [ "serde1" ] }
camino.workspace = true
humantime.workspace = true
hyper = { workspace = true, features = ["stream"] }
futures.workspace = true

View File

@@ -54,10 +54,7 @@ impl AzureBlobStorage {
azure_config.container_name
);
// Use the storage account from the config by default, fall back to env var if not present.
let account = azure_config.storage_account.clone().unwrap_or_else(|| {
env::var("AZURE_STORAGE_ACCOUNT").expect("missing AZURE_STORAGE_ACCOUNT")
});
let account = env::var("AZURE_STORAGE_ACCOUNT").expect("missing AZURE_STORAGE_ACCOUNT");
// If the `AZURE_STORAGE_ACCESS_KEY` env var has an access key, use that,
// otherwise try the token based credentials.

View File

@@ -27,7 +27,7 @@ use std::{
time::{Duration, SystemTime},
};
use anyhow::Context;
use anyhow::{bail, Context};
use aws_sdk_s3::types::StorageClass;
use camino::{Utf8Path, Utf8PathBuf};
@@ -36,6 +36,7 @@ use futures::stream::Stream;
use serde::{Deserialize, Serialize};
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
use toml_edit::Item;
use tracing::info;
pub use self::{
@@ -465,11 +466,7 @@ impl GenericRemoteStorage {
Self::AwsS3(Arc::new(S3Bucket::new(s3_config, timeout)?))
}
RemoteStorageKind::AzureContainer(azure_config) => {
let storage_account = azure_config
.storage_account
.as_deref()
.unwrap_or("<AZURE_STORAGE_ACCOUNT>");
info!("Using azure container '{}' in account '{storage_account}' in region '{}' as a remote storage, prefix in container: '{:?}'",
info!("Using azure container '{}' in region '{}' as a remote storage, prefix in container: '{:?}'",
azure_config.container_name, azure_config.container_region, azure_config.prefix_in_container);
Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config, timeout)?))
}
@@ -592,8 +589,6 @@ impl Debug for S3Config {
pub struct AzureConfig {
/// Name of the container to connect to.
pub container_name: String,
/// Name of the storage account the container is inside of
pub storage_account: Option<String>,
/// The region where the bucket is located at.
pub container_region: String,
/// A "subfolder" in the container, to use the same container separately by multiple remote storage users at once.
@@ -608,9 +603,8 @@ impl Debug for AzureConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AzureConfig")
.field("bucket_name", &self.container_name)
.field("storage_account", &self.storage_account)
.field("bucket_region", &self.container_region)
.field("prefix_in_container", &self.prefix_in_container)
.field("prefix_in_bucket", &self.prefix_in_container)
.field("concurrency_limit", &self.concurrency_limit)
.field(
"max_keys_per_list_response",
@@ -620,158 +614,58 @@ impl Debug for AzureConfig {
}
}
struct RemoteStorageConfigDeserializeVisitor;
impl RemoteStorageConfig {
pub const DEFAULT_TIMEOUT: Duration = std::time::Duration::from_secs(120);
impl<'de> serde::de::Visitor<'de> for RemoteStorageConfigDeserializeVisitor {
type Value = RemoteStorageConfig;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a RemoteStorageConfig")
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: serde::de::MapAccess<'de>,
{
let mut local_path: Option<Utf8PathBuf> = None;
let mut bucket_name = None;
let mut bucket_region = None;
let mut prefix_in_bucket = None;
let mut container_name = None;
let mut storage_account = None;
let mut container_region = None;
let mut prefix_in_container = None;
let mut concurrency_limit = None;
let mut max_keys_per_list_response = None;
let mut upload_storage_class: Option<StorageClass> = None;
let mut endpoint = None;
let mut timeout: Option<Duration> = None;
while let Some(key) = map.next_key::<String>()? {
match key.as_str() {
"local_path" => {
if local_path.is_some() {
return Err(serde::de::Error::duplicate_field("local_path"));
}
local_path = Some(map.next_value()?);
}
"bucket_name" => {
if bucket_name.is_some() {
return Err(serde::de::Error::duplicate_field("bucket_name"));
}
bucket_name = Some(map.next_value()?);
}
"bucket_region" => {
if bucket_region.is_some() {
return Err(serde::de::Error::duplicate_field("bucket_region"));
}
bucket_region = Some(map.next_value()?);
}
"prefix_in_bucket" => {
if prefix_in_bucket.is_some() {
return Err(serde::de::Error::duplicate_field("prefix_in_bucket"));
}
prefix_in_bucket = Some(map.next_value()?);
}
"container_name" => {
if container_name.is_some() {
return Err(serde::de::Error::duplicate_field("container_name"));
}
container_name = Some(map.next_value()?);
}
"storage_account" => {
if storage_account.is_some() {
return Err(serde::de::Error::duplicate_field("storage_account"));
}
storage_account = map.next_value()?;
}
"container_region" => {
if container_region.is_some() {
return Err(serde::de::Error::duplicate_field("container_region"));
}
container_region = Some(map.next_value()?);
}
"prefix_in_container" => {
if prefix_in_container.is_some() {
return Err(serde::de::Error::duplicate_field("prefix_in_container"));
}
prefix_in_container = Some(map.next_value()?);
}
"concurrency_limit" => {
if concurrency_limit.is_some() {
return Err(serde::de::Error::duplicate_field("concurrency_limit"));
}
concurrency_limit = Some(map.next_value()?);
}
"max_keys_per_list_response" => {
if max_keys_per_list_response.is_some() {
return Err(serde::de::Error::duplicate_field(
"max_keys_per_list_response",
));
}
max_keys_per_list_response = Some(map.next_value()?);
}
"upload_storage_class" => {
if upload_storage_class.is_some() {
return Err(serde::de::Error::duplicate_field("upload_storage_class"));
}
let s = map.next_value::<String>()?;
let v = StorageClass::from_str(&s).expect("infallible");
#[allow(deprecated)]
if matches!(v, StorageClass::Unknown(_)) {
let values = format!("{:?}", StorageClass::values());
return Err(serde::de::Error::invalid_value(
serde::de::Unexpected::Str(&s),
&values.as_str(),
));
}
upload_storage_class = Some(v);
}
"endpoint" => {
if endpoint.is_some() {
return Err(serde::de::Error::duplicate_field("endpoint"));
}
endpoint = Some(map.next_value()?);
}
"timeout" => {
if timeout.is_some() {
return Err(serde::de::Error::duplicate_field("timeout"));
}
let s = map.next_value::<String>()?;
let d = humantime::parse_duration(&s)
.map_err(|e| format!("invalid `timeout`: {e}"))
.map_err(serde::de::Error::custom)?;
timeout = Some(d);
}
field => {
return Err(serde::de::Error::custom(format!("unknown field {field:?}")));
}
}
}
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<Option<RemoteStorageConfig>> {
let local_path = toml.get("local_path");
let bucket_name = toml.get("bucket_name");
let bucket_region = toml.get("bucket_region");
let container_name = toml.get("container_name");
let container_region = toml.get("container_region");
let use_azure = container_name.is_some() && container_region.is_some();
let concurrency_limit = {
let default = if use_azure {
DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT
} else {
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
};
concurrency_limit
.unwrap_or(NonZeroUsize::new(default).expect("defaults should be valid"))
let default_concurrency_limit = if use_azure {
DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT
} else {
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
};
let concurrency_limit = NonZeroUsize::new(
parse_optional_integer("concurrency_limit", toml)?.unwrap_or(default_concurrency_limit),
)
.context("Failed to parse 'concurrency_limit' as a positive integer")?;
let max_keys_per_list_response =
max_keys_per_list_response.unwrap_or(DEFAULT_MAX_KEYS_PER_LIST_RESPONSE);
parse_optional_integer::<i32, _>("max_keys_per_list_response", toml)
.context("Failed to parse 'max_keys_per_list_response' as a positive integer")?
.or(DEFAULT_MAX_KEYS_PER_LIST_RESPONSE);
let timeout = {
let timeout = timeout.unwrap_or(RemoteStorageConfig::DEFAULT_TIMEOUT);
if timeout < Duration::from_secs(1) {
return Err(serde::de::Error::custom(format!(
"timeout was specified as {timeout:?} which is too low"
)));
}
timeout
};
let endpoint = toml
.get("endpoint")
.map(|endpoint| parse_toml_string("endpoint", endpoint))
.transpose()?;
let timeout = toml
.get("timeout")
.map(|timeout| {
timeout
.as_str()
.ok_or_else(|| anyhow::Error::msg("timeout was not a string"))
})
.transpose()
.and_then(|timeout| {
timeout
.map(humantime::parse_duration)
.transpose()
.map_err(anyhow::Error::new)
})
.context("parse timeout")?
.unwrap_or(Self::DEFAULT_TIMEOUT);
if timeout < Duration::from_secs(1) {
bail!("timeout was specified as {timeout:?} which is too low");
}
let storage = match (
local_path,
@@ -780,73 +674,99 @@ impl<'de> serde::de::Visitor<'de> for RemoteStorageConfigDeserializeVisitor {
container_name,
container_region,
) {
(None, None, None, None, None) => Err(serde::de::Error::custom(
"one or more mandatory fields not specified",
)),
(_, Some(_), None, ..) => Err(serde::de::Error::custom(
"'bucket_region' option is mandatory if 'bucket_name' is given ",
)),
(_, None, Some(_), ..) => Err(serde::de::Error::custom(
"'bucket_name' option is mandatory if 'bucket_region' is given ",
)),
// no 'local_path' nor 'bucket_name' options are provided, consider this remote storage disabled
(None, None, None, None, None) => return Ok(None),
(_, Some(_), None, ..) => {
bail!("'bucket_region' option is mandatory if 'bucket_name' is given ")
}
(_, None, Some(_), ..) => {
bail!("'bucket_name' option is mandatory if 'bucket_region' is given ")
}
(None, Some(bucket_name), Some(bucket_region), ..) => {
Ok(RemoteStorageKind::AwsS3(S3Config {
bucket_name,
bucket_region,
prefix_in_bucket,
RemoteStorageKind::AwsS3(S3Config {
bucket_name: parse_toml_string("bucket_name", bucket_name)?,
bucket_region: parse_toml_string("bucket_region", bucket_region)?,
prefix_in_bucket: toml
.get("prefix_in_bucket")
.map(|prefix_in_bucket| {
parse_toml_string("prefix_in_bucket", prefix_in_bucket)
})
.transpose()?,
endpoint,
concurrency_limit,
max_keys_per_list_response,
upload_storage_class,
}))
upload_storage_class: toml
.get("upload_storage_class")
.map(|prefix_in_bucket| -> anyhow::Result<_> {
let s = parse_toml_string("upload_storage_class", prefix_in_bucket)?;
let storage_class = StorageClass::from_str(&s).expect("infallible");
#[allow(deprecated)]
if matches!(storage_class, StorageClass::Unknown(_)) {
bail!("Specified storage class unknown to SDK: '{s}'. Allowed values: {:?}", StorageClass::values());
}
Ok(storage_class)
})
.transpose()?,
})
}
(_, _, _, Some(_), None) => {
bail!("'container_name' option is mandatory if 'container_region' is given ")
}
(_, _, _, None, Some(_)) => {
bail!("'container_name' option is mandatory if 'container_region' is given ")
}
(_, _, _, Some(_), None) => Err(serde::de::Error::custom(
"'container_name' option is mandatory if 'container_region' is given ",
)),
(_, _, _, None, Some(_)) => Err(serde::de::Error::custom(
"'container_name' option is mandatory if 'container_region' is given ",
)),
(None, None, None, Some(container_name), Some(container_region)) => {
Ok(RemoteStorageKind::AzureContainer(AzureConfig {
container_name,
storage_account,
container_region,
prefix_in_container,
RemoteStorageKind::AzureContainer(AzureConfig {
container_name: parse_toml_string("container_name", container_name)?,
container_region: parse_toml_string("container_region", container_region)?,
prefix_in_container: toml
.get("prefix_in_container")
.map(|prefix_in_container| {
parse_toml_string("prefix_in_container", prefix_in_container)
})
.transpose()?,
concurrency_limit,
max_keys_per_list_response,
}))
})
}
(Some(local_path), None, None, None, None) => {
Ok(RemoteStorageKind::LocalFs(local_path))
(Some(local_path), None, None, None, None) => RemoteStorageKind::LocalFs(
Utf8PathBuf::from(parse_toml_string("local_path", local_path)?),
),
(Some(_), Some(_), ..) => {
bail!("'local_path' and 'bucket_name' are mutually exclusive")
}
(Some(_), Some(_), ..) => Err(serde::de::Error::custom(
"'local_path' and 'bucket_name' are mutually exclusive",
)),
(Some(_), _, _, Some(_), Some(_)) => Err(serde::de::Error::custom(
"local_path and 'container_name' are mutually exclusive",
)),
}?;
(Some(_), _, _, Some(_), Some(_)) => {
bail!("local_path and 'container_name' are mutually exclusive")
}
};
Ok(RemoteStorageConfig { storage, timeout })
Ok(Some(RemoteStorageConfig { storage, timeout }))
}
}
impl<'de> serde::Deserialize<'de> for RemoteStorageConfig {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
deserializer.deserialize_map(RemoteStorageConfigDeserializeVisitor)
}
// Helper functions to parse a toml Item
fn parse_optional_integer<I, E>(name: &str, item: &toml_edit::Item) -> anyhow::Result<Option<I>>
where
I: TryFrom<i64, Error = E>,
E: std::error::Error + Send + Sync + 'static,
{
let toml_integer = match item.get(name) {
Some(item) => item
.as_integer()
.with_context(|| format!("configure option {name} is not an integer"))?,
None => return Ok(None),
};
I::try_from(toml_integer)
.map(Some)
.with_context(|| format!("configure option {name} is too large"))
}
impl RemoteStorageConfig {
pub const DEFAULT_TIMEOUT: Duration = std::time::Duration::from_secs(120);
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<Option<RemoteStorageConfig>> {
let toml = toml.to_string();
Ok(toml_edit::de::from_str(&toml)?)
}
fn parse_toml_string(name: &str, item: &Item) -> anyhow::Result<String> {
let s = item
.as_str()
.with_context(|| format!("configure option {name} is not a string"))?;
Ok(s.to_string())
}
struct ConcurrencyLimiter {

View File

@@ -212,7 +212,6 @@ fn create_azure_client(
let remote_storage_config = RemoteStorageConfig {
storage: RemoteStorageKind::AzureContainer(AzureConfig {
container_name: remote_storage_azure_container,
storage_account: None,
container_region: remote_storage_azure_region,
prefix_in_container: Some(format!("test_{millis}_{random:08x}/")),
concurrency_limit: NonZeroUsize::new(100).unwrap(),

View File

@@ -7,7 +7,7 @@ license.workspace = true
[dependencies]
hyper.workspace = true
opentelemetry = { workspace = true, features=["rt-tokio"] }
opentelemetry-otlp = { workspace = true, default-features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-otlp = { workspace = true, default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-semantic-conventions.workspace = true
reqwest = { workspace = true, default-features = false, features = ["rustls-tls"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }

View File

@@ -25,8 +25,6 @@ pub struct Config {
///
/// For simplicity, this value must be greater than or equal to `memory_history_len`.
memory_history_log_interval: usize,
/// The max number of iterations to skip before logging the next iteration
memory_history_log_noskip_interval: Duration,
}
impl Default for Config {
@@ -35,7 +33,6 @@ impl Default for Config {
memory_poll_interval: Duration::from_millis(100),
memory_history_len: 5, // use 500ms of history for decision-making
memory_history_log_interval: 20, // but only log every ~2s (otherwise it's spammy)
memory_history_log_noskip_interval: Duration::from_secs(15), // but only if it's changed, or 60 seconds have passed
}
}
}
@@ -88,12 +85,7 @@ impl CgroupWatcher {
// buffer for samples that will be logged. once full, it remains so.
let history_log_len = self.config.memory_history_log_interval;
let max_skip = self.config.memory_history_log_noskip_interval;
let mut history_log_buf = vec![MemoryStatus::zeroed(); history_log_len];
let mut last_logged_memusage = MemoryStatus::zeroed();
// Ensure that we're tracking a value that's definitely in the past, as Instant::now is only guaranteed to be non-decreasing on Rust's T1-supported systems.
let mut can_skip_logs_until = Instant::now() - max_skip;
for t in 0_u64.. {
ticker.tick().await;
@@ -123,24 +115,12 @@ impl CgroupWatcher {
// equal to the logging interval, we can just log the entire buffer every time we set
// the last entry, which also means that for this log line, we can ignore that it's a
// ring buffer (because all the entries are in order of increasing time).
//
// We skip logging the data if data hasn't meaningfully changed in a while, unless
// we've already ignored previous iterations for the last max_skip period.
if i == history_log_len - 1
&& (now > can_skip_logs_until
|| !history_log_buf
.iter()
.all(|usage| last_logged_memusage.status_is_close_or_similar(usage)))
{
if i == history_log_len - 1 {
info!(
history = ?MemoryStatus::debug_slice(&history_log_buf),
summary = ?summary,
"Recent cgroup memory statistics history"
);
can_skip_logs_until = now + max_skip;
last_logged_memusage = *history_log_buf.last().unwrap();
}
updates
@@ -252,24 +232,6 @@ impl MemoryStatus {
DS(slice)
}
/// Check if the other memory status is a close or similar result.
/// Returns true if the larger value is not larger than the smaller value
/// by 1/8 of the smaller value, and within 128MiB.
/// See tests::check_similarity_behaviour for examples of behaviour
fn status_is_close_or_similar(&self, other: &MemoryStatus) -> bool {
let margin;
let diff;
if self.non_reclaimable >= other.non_reclaimable {
margin = other.non_reclaimable / 8;
diff = self.non_reclaimable - other.non_reclaimable;
} else {
margin = self.non_reclaimable / 8;
diff = other.non_reclaimable - self.non_reclaimable;
}
diff < margin && diff < 128 * 1024 * 1024
}
}
#[cfg(test)]
@@ -299,65 +261,4 @@ mod tests {
assert_eq!(values(2, 4), [9, 0, 1, 2]);
assert_eq!(values(2, 10), [3, 4, 5, 6, 7, 8, 9, 0, 1, 2]);
}
#[test]
fn check_similarity_behaviour() {
// This all accesses private methods, so we can't actually run this
// as doctests, because doctests run as an external crate.
let mut small = super::MemoryStatus {
non_reclaimable: 1024,
};
let mut large = super::MemoryStatus {
non_reclaimable: 1024 * 1024 * 1024 * 1024,
};
// objects are self-similar, no matter the size
assert!(small.status_is_close_or_similar(&small));
assert!(large.status_is_close_or_similar(&large));
// inequality is symmetric
assert!(!small.status_is_close_or_similar(&large));
assert!(!large.status_is_close_or_similar(&small));
small.non_reclaimable = 64;
large.non_reclaimable = (small.non_reclaimable / 8) * 9;
// objects are self-similar, no matter the size
assert!(small.status_is_close_or_similar(&small));
assert!(large.status_is_close_or_similar(&large));
// values are similar if the larger value is larger by less than
// 12.5%, i.e. 1/8 of the smaller value.
// In the example above, large is exactly 12.5% larger, so this doesn't
// match.
assert!(!small.status_is_close_or_similar(&large));
assert!(!large.status_is_close_or_similar(&small));
large.non_reclaimable -= 1;
assert!(large.status_is_close_or_similar(&large));
assert!(small.status_is_close_or_similar(&large));
assert!(large.status_is_close_or_similar(&small));
// The 1/8 rule only applies up to 128MiB of difference
small.non_reclaimable = 1024 * 1024 * 1024 * 1024;
large.non_reclaimable = small.non_reclaimable / 8 * 9;
assert!(small.status_is_close_or_similar(&small));
assert!(large.status_is_close_or_similar(&large));
assert!(!small.status_is_close_or_similar(&large));
assert!(!large.status_is_close_or_similar(&small));
// the large value is put just above the threshold
large.non_reclaimable = small.non_reclaimable + 128 * 1024 * 1024;
assert!(large.status_is_close_or_similar(&large));
assert!(!small.status_is_close_or_similar(&large));
assert!(!large.status_is_close_or_similar(&small));
// now below
large.non_reclaimable -= 1;
assert!(large.status_is_close_or_similar(&large));
assert!(small.status_is_close_or_similar(&large));
assert!(large.status_is_close_or_similar(&small));
}
}

View File

@@ -12,11 +12,11 @@ use futures::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use tracing::{debug, info};
use tracing::info;
use crate::protocol::{
OutboundMsg, OutboundMsgKind, ProtocolRange, ProtocolResponse, ProtocolVersion,
PROTOCOL_MAX_VERSION, PROTOCOL_MIN_VERSION,
OutboundMsg, ProtocolRange, ProtocolResponse, ProtocolVersion, PROTOCOL_MAX_VERSION,
PROTOCOL_MIN_VERSION,
};
/// The central handler for all communications in the monitor.
@@ -118,12 +118,7 @@ impl Dispatcher {
/// serialize the wrong thing and send it, since `self.sink.send` will take
/// any string.
pub async fn send(&mut self, message: OutboundMsg) -> anyhow::Result<()> {
if matches!(&message.inner, OutboundMsgKind::HealthCheck { .. }) {
debug!(?message, "sending message");
} else {
info!(?message, "sending message");
}
info!(?message, "sending message");
let json = serde_json::to_string(&message).context("failed to serialize message")?;
self.sink
.send(Message::Text(json))

View File

@@ -12,7 +12,7 @@ use axum::extract::ws::{Message, WebSocket};
use futures::StreamExt;
use tokio::sync::{broadcast, watch};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use tracing::{error, info, warn};
use crate::cgroup::{self, CgroupWatcher};
use crate::dispatcher::Dispatcher;
@@ -474,29 +474,26 @@ impl Runner {
// there is a message from the agent
msg = self.dispatcher.source.next() => {
if let Some(msg) = msg {
match &msg {
// Don't use 'message' as a key as the string also uses
// that for its key
info!(?msg, "received message");
match msg {
Ok(msg) => {
let message: InboundMsg = match msg {
Message::Text(text) => {
serde_json::from_str(text).context("failed to deserialize text message")?
serde_json::from_str(&text).context("failed to deserialize text message")?
}
other => {
warn!(
// Don't use 'message' as a key as the
// string also uses that for its key
msg = ?other,
"problem processing incoming message: agent should only send text messages but received different type"
"agent should only send text messages but received different type"
);
continue
},
};
if matches!(&message.inner, InboundMsgKind::HealthCheck { .. }) {
debug!(?msg, "received message");
} else {
info!(?msg, "received message");
}
let out = match self.process_message(message.clone()).await {
Ok(Some(out)) => out,
Ok(None) => continue,
@@ -520,11 +517,7 @@ impl Runner {
.await
.context("failed to send message")?;
}
Err(e) => warn!(
error = format!("{e}"),
msg = ?msg,
"received error message"
),
Err(e) => warn!("{e}"),
}
} else {
anyhow::bail!("dispatcher connection closed")

View File

@@ -8,7 +8,7 @@ license.workspace = true
default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]
testing = ["fail/failpoints", "pageserver_api/testing"]
[dependencies]
anyhow.workspace = true

View File

@@ -2,9 +2,10 @@
//! and push them to a HTTP endpoint.
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::size::CalculateSyntheticSizeError;
use crate::tenant::tasks::BackgroundLoopKind;
use crate::tenant::{mgr::TenantManager, LogicalSizeCalculationCause, Tenant};
use crate::tenant::{
mgr::TenantManager, LogicalSizeCalculationCause, PageReconstructError, Tenant,
};
use camino::Utf8PathBuf;
use consumption_metrics::EventType;
use pageserver_api::models::TenantState;
@@ -349,12 +350,19 @@ async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &Re
// Same for the loop that fetches computed metrics.
// By using the same limiter, we centralize metrics collection for "start" and "finished" counters,
// which turns out is really handy to understand the system.
match tenant.calculate_synthetic_size(CAUSE, cancel, ctx).await {
Ok(_) => {}
Err(CalculateSyntheticSizeError::Cancelled) => {}
Err(e) => {
let tenant_shard_id = tenant.tenant_shard_id();
error!("failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}");
}
let Err(e) = tenant.calculate_synthetic_size(CAUSE, cancel, ctx).await else {
return;
};
// this error can be returned if timeline is shutting down, but it does not
// mean the synthetic size worker should terminate.
let shutting_down = matches!(
e.downcast_ref::<PageReconstructError>(),
Some(PageReconstructError::Cancelled)
);
if !shutting_down {
let tenant_shard_id = tenant.tenant_shard_id();
error!("failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}");
}
}

View File

@@ -1135,10 +1135,7 @@ async fn tenant_size_handler(
&ctx,
)
.await
.map_err(|e| match e {
crate::tenant::size::CalculateSyntheticSizeError::Cancelled => ApiError::ShuttingDown,
other => ApiError::InternalServerError(anyhow::anyhow!(other)),
})?;
.map_err(ApiError::InternalServerError)?;
let mut sizes = None;
let accepts_html = headers
@@ -1146,7 +1143,9 @@ async fn tenant_size_handler(
.map(|v| v == "text/html")
.unwrap_or_default();
if !inputs_only.unwrap_or(false) {
let storage_model = inputs.calculate_model();
let storage_model = inputs
.calculate_model()
.map_err(ApiError::InternalServerError)?;
let size = storage_model.calculate();
// If request header expects html, return html
@@ -1730,7 +1729,7 @@ async fn lsn_lease_handler(
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
let result = timeline
.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), &ctx)
.make_lsn_lease(lsn, &ctx)
.map_err(|e| ApiError::InternalServerError(e.context("lsn lease http handler")))?;
json_response(StatusCode::OK, result)

View File

@@ -935,7 +935,7 @@ impl PageServerHandler {
let timeline = self
.get_active_tenant_timeline(tenant_shard_id.tenant_id, timeline_id, shard_selector)
.await?;
let lease = timeline.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)?;
let lease = timeline.make_lsn_lease(lsn, ctx)?;
let valid_until = lease
.valid_until
.duration_since(SystemTime::UNIX_EPOCH)

View File

@@ -919,14 +919,6 @@ impl Timeline {
result.add_key(AUX_FILES_KEY);
}
#[cfg(test)]
{
let guard = self.extra_test_dense_keyspace.load();
for kr in &guard.ranges {
result.add_range(kr.clone());
}
}
Ok((
result.to_keyspace(),
/* AUX sparse key space */

View File

@@ -240,7 +240,6 @@ pub struct GcResult {
pub layers_needed_by_cutoff: u64,
pub layers_needed_by_pitr: u64,
pub layers_needed_by_branches: u64,
pub layers_needed_by_leases: u64,
pub layers_not_updated: u64,
pub layers_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files.
@@ -270,7 +269,6 @@ impl AddAssign for GcResult {
self.layers_needed_by_pitr += other.layers_needed_by_pitr;
self.layers_needed_by_cutoff += other.layers_needed_by_cutoff;
self.layers_needed_by_branches += other.layers_needed_by_branches;
self.layers_needed_by_leases += other.layers_needed_by_leases;
self.layers_not_updated += other.layers_not_updated;
self.layers_removed += other.layers_removed;

View File

@@ -31,7 +31,6 @@ use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use remote_storage::TimeoutOrCancel;
use std::fmt;
use std::time::SystemTime;
use storage_broker::BrokerClientChannel;
use tokio::io::BufReader;
use tokio::sync::watch;
@@ -66,9 +65,9 @@ use self::timeline::uninit::TimelineCreateGuard;
use self::timeline::uninit::TimelineExclusionError;
use self::timeline::uninit::UninitializedTimeline;
use self::timeline::EvictionTaskTenantState;
use self::timeline::GcCutoffs;
use self::timeline::TimelineResources;
use self::timeline::WaitLsnError;
use self::timeline::{GcCutoffs, GcInfo};
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueueClient;
@@ -510,24 +509,11 @@ pub(crate) enum GcError {
#[error(transparent)]
Remote(anyhow::Error),
// An error reading while calculating GC cutoffs
#[error(transparent)]
GcCutoffs(PageReconstructError),
// If GC was invoked for a particular timeline, this error means it didn't exist
#[error("timeline not found")]
TimelineNotFound,
}
impl From<PageReconstructError> for GcError {
fn from(value: PageReconstructError) -> Self {
match value {
PageReconstructError::Cancelled => Self::TimelineCancelled,
other => Self::GcCutoffs(other),
}
}
}
impl Tenant {
/// Yet another helper for timeline initialization.
///
@@ -1047,6 +1033,7 @@ impl Tenant {
remote_metadata,
TimelineResources {
remote_client,
deletion_queue_client: self.deletion_queue_client.clone(),
timeline_get_throttle: self.timeline_get_throttle.clone(),
},
ctx,
@@ -1072,6 +1059,7 @@ impl Tenant {
timeline_id,
&index_part.metadata,
remote_timeline_client,
self.deletion_queue_client.clone(),
)
.instrument(tracing::info_span!("timeline_delete", %timeline_id))
.await
@@ -2429,13 +2417,6 @@ impl Tenant {
}
}
pub fn get_lsn_lease_length(&self) -> Duration {
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
tenant_conf
.lsn_lease_length
.unwrap_or(self.conf.default_tenant_conf.lsn_lease_length)
}
pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
// Use read-copy-update in order to avoid overwriting the location config
// state if this races with [`Tenant::set_new_location_config`]. Note that
@@ -2940,9 +2921,17 @@ impl Tenant {
.checked_sub(horizon)
.unwrap_or(Lsn(0));
let cutoffs = timeline.find_gc_cutoffs(cutoff, pitr, cancel, ctx).await?;
let old = gc_cutoffs.insert(timeline.timeline_id, cutoffs);
assert!(old.is_none());
let res = timeline.find_gc_cutoffs(cutoff, pitr, cancel, ctx).await;
match res {
Ok(cutoffs) => {
let old = gc_cutoffs.insert(timeline.timeline_id, cutoffs);
assert!(old.is_none());
}
Err(e) => {
tracing::warn!(timeline_id = %timeline.timeline_id, "ignoring failure to find gc cutoffs: {e:#}");
}
}
}
if !self.is_active() || self.cancel.is_cancelled() {
@@ -3018,13 +3007,12 @@ impl Tenant {
{
let mut target = timeline.gc_info.write().unwrap();
let now = SystemTime::now();
target.leases.retain(|_, lease| !lease.is_expired(&now));
match gc_cutoffs.remove(&timeline.timeline_id) {
Some(cutoffs) => {
target.retain_lsns = branchpoints;
target.cutoffs = cutoffs;
*target = GcInfo {
retain_lsns: branchpoints,
cutoffs,
};
}
None => {
// reasons for this being unavailable:
@@ -3455,6 +3443,7 @@ impl Tenant {
);
TimelineResources {
remote_client,
deletion_queue_client: self.deletion_queue_client.clone(),
timeline_get_throttle: self.timeline_get_throttle.clone(),
}
}
@@ -3564,7 +3553,7 @@ impl Tenant {
cause: LogicalSizeCalculationCause,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<size::ModelInputs, size::CalculateSyntheticSizeError> {
) -> anyhow::Result<size::ModelInputs> {
let logical_sizes_at_once = self
.conf
.concurrent_tenant_size_logical_size_queries
@@ -3579,8 +3568,8 @@ impl Tenant {
// See more for on the issue #2748 condenced out of the initial PR review.
let mut shared_cache = tokio::select! {
locked = self.cached_logical_sizes.lock() => locked,
_ = cancel.cancelled() => return Err(size::CalculateSyntheticSizeError::Cancelled),
_ = self.cancel.cancelled() => return Err(size::CalculateSyntheticSizeError::Cancelled),
_ = cancel.cancelled() => anyhow::bail!("cancelled"),
_ = self.cancel.cancelled() => anyhow::bail!("tenant is shutting down"),
};
size::gather_inputs(
@@ -3604,10 +3593,10 @@ impl Tenant {
cause: LogicalSizeCalculationCause,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<u64, size::CalculateSyntheticSizeError> {
) -> anyhow::Result<u64> {
let inputs = self.gather_size_inputs(None, cause, cancel, ctx).await?;
let size = inputs.calculate();
let size = inputs.calculate()?;
self.set_cached_synthetic_size(size);
@@ -3842,8 +3831,8 @@ pub(crate) mod harness {
tenant_conf.image_layer_creation_check_threshold,
),
switch_aux_file_policy: Some(tenant_conf.switch_aux_file_policy),
lsn_lease_length: Some(tenant_conf.lsn_lease_length),
lsn_lease_length_for_ts: Some(tenant_conf.lsn_lease_length_for_ts),
#[cfg(feature = "testing")]
test_vm_bit_debug_logging: Some(tenant_conf.test_vm_bit_debug_logging),
}
}
}
@@ -4054,7 +4043,6 @@ mod tests {
use crate::repository::{Key, Value};
use crate::tenant::harness::*;
use crate::tenant::timeline::CompactFlags;
use crate::walrecord::NeonWalRecord;
use crate::DEFAULT_PG_VERSION;
use bytes::{Bytes, BytesMut};
use hex_literal::hex;
@@ -5278,9 +5266,6 @@ mod tests {
let cancel = CancellationToken::new();
let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let mut test_key_end = test_key;
test_key_end.field6 = NUM_KEYS as u32;
tline.add_extra_test_dense_keyspace(KeySpace::single(test_key..test_key_end));
let mut keyspace = KeySpaceAccum::new();
@@ -6240,8 +6225,8 @@ mod tests {
let cancel = CancellationToken::new();
let base_key = Key::from_hex("620000000033333333444444445500000000").unwrap();
assert_eq!(base_key.field1, AUX_KEY_PREFIX); // in case someone accidentally changed the prefix...
let mut base_key = Key::from_hex("000000000033333333444444445500000000").unwrap();
base_key.field1 = AUX_KEY_PREFIX;
let mut test_key = base_key;
let mut lsn = Lsn(0x10);
@@ -6346,7 +6331,6 @@ mod tests {
Lsn(0x20), // it's fine to not advance LSN to 0x30 while using 0x30 to get below because `get_vectored_impl` does not wait for LSN
)
.await?;
tline.add_extra_test_dense_keyspace(KeySpace::single(base_key..(base_key_nonexist.next())));
let child = tenant
.branch_timeline_test_with_layers(
@@ -6719,8 +6703,8 @@ mod tests {
}
#[tokio::test]
async fn test_simple_bottom_most_compaction_images() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_simple_bottom_most_compaction_images")?;
async fn test_simple_bottom_most_compaction() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_simple_bottom_most_compaction")?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
@@ -6875,168 +6859,4 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_neon_test_record() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_neon_test_record")?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
// using aux key here b/c they are guaranteed to be inside `collect_keyspace`.
let mut key = Key::from_hex("620000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}
let delta1 = vec![
(
get_key(1),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append(",0x20")),
),
(
get_key(1),
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append(",0x30")),
),
(get_key(2), Lsn(0x10), Value::Image("0x10".into())),
(
get_key(2),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append(",0x20")),
),
(
get_key(2),
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append(",0x30")),
),
(get_key(3), Lsn(0x10), Value::Image("0x10".into())),
(
get_key(3),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_clear()),
),
(get_key(4), Lsn(0x10), Value::Image("0x10".into())),
(
get_key(4),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_init()),
),
];
let image1 = vec![(get_key(1), "0x10".into())];
let tline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
vec![delta1], // delta layers
vec![(Lsn(0x10), image1)], // image layers
Lsn(0x50),
)
.await?;
assert_eq!(
tline.get(get_key(1), Lsn(0x50), &ctx).await?,
Bytes::from_static(b"0x10,0x20,0x30")
);
assert_eq!(
tline.get(get_key(2), Lsn(0x50), &ctx).await?,
Bytes::from_static(b"0x10,0x20,0x30")
);
// assert_eq!(tline.get(get_key(3), Lsn(0x50), &ctx).await?, Bytes::new());
// assert_eq!(tline.get(get_key(4), Lsn(0x50), &ctx).await?, Bytes::new());
Ok(())
}
#[tokio::test]
async fn test_lsn_lease() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_lsn_lease")?.load().await;
let key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let end_lsn = Lsn(0x100);
let image_layers = (0x20..=0x90)
.step_by(0x10)
.map(|n| {
(
Lsn(n),
vec![(key, test_img(&format!("data key at {:x}", n)))],
)
})
.collect();
let timeline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
Vec::new(),
image_layers,
end_lsn,
)
.await?;
let leased_lsns = [0x30, 0x50, 0x70];
let mut leases = Vec::new();
let _: anyhow::Result<_> = leased_lsns.iter().try_for_each(|n| {
leases.push(timeline.make_lsn_lease(Lsn(*n), timeline.get_lsn_lease_length(), &ctx)?);
Ok(())
});
// Renewing with shorter lease should not change the lease.
let updated_lease_0 =
timeline.make_lsn_lease(Lsn(leased_lsns[0]), Duration::from_secs(0), &ctx)?;
assert_eq!(updated_lease_0.valid_until, leases[0].valid_until);
// Renewing with a long lease should renew lease with later expiration time.
let updated_lease_1 = timeline.make_lsn_lease(
Lsn(leased_lsns[1]),
timeline.get_lsn_lease_length() * 2,
&ctx,
)?;
assert!(updated_lease_1.valid_until > leases[1].valid_until);
// Force set disk consistent lsn so we can get the cutoff at `end_lsn`.
info!(
"latest_gc_cutoff_lsn: {}",
*timeline.get_latest_gc_cutoff_lsn()
);
timeline.force_set_disk_consistent_lsn(end_lsn);
let res = tenant
.gc_iteration(
Some(TIMELINE_ID),
0,
Duration::ZERO,
&CancellationToken::new(),
&ctx,
)
.await?;
// Keeping everything <= Lsn(0x80) b/c leases:
// 0/10: initdb layer
// (0/20..=0/70).step_by(0x10): image layers added when creating the timeline.
assert_eq!(res.layers_needed_by_leases, 7);
// Keeping 0/90 b/c it is the latest layer.
assert_eq!(res.layers_not_updated, 1);
// Removed 0/80.
assert_eq!(res.layers_removed, 1);
// Make lease on a already GC-ed LSN.
// 0/80 does not have a valid lease + is below latest_gc_cutoff
assert!(Lsn(0x80) < *timeline.get_latest_gc_cutoff_lsn());
let res = timeline.make_lsn_lease(Lsn(0x80), timeline.get_lsn_lease_length(), &ctx);
assert!(res.is_err());
// Should still be able to renew a currently valid lease
// Assumption: original lease to is still valid for 0/50.
let _ =
timeline.make_lsn_lease(Lsn(leased_lsns[1]), timeline.get_lsn_lease_length(), &ctx)?;
Ok(())
}
}

View File

@@ -13,7 +13,6 @@ use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::CompactionAlgorithm;
use pageserver_api::models::CompactionAlgorithmSettings;
use pageserver_api::models::EvictionPolicy;
use pageserver_api::models::LsnLease;
use pageserver_api::models::{self, ThrottleConfig};
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
use serde::de::IntoDeserializer;
@@ -379,15 +378,8 @@ pub struct TenantConf {
/// file is written.
pub switch_aux_file_policy: AuxFilePolicy,
/// The length for an explicit LSN lease request.
/// Layers needed to reconstruct pages at LSN will not be GC-ed during this interval.
#[serde(with = "humantime_serde")]
pub lsn_lease_length: Duration,
/// The length for an implicit LSN lease granted as part of `get_lsn_by_timestamp` request.
/// Layers needed to reconstruct pages at LSN will not be GC-ed during this interval.
#[serde(with = "humantime_serde")]
pub lsn_lease_length_for_ts: Duration,
#[cfg(feature = "testing")]
pub test_vm_bit_debug_logging: bool,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -488,15 +480,10 @@ pub struct TenantConfOpt {
#[serde(default)]
pub switch_aux_file_policy: Option<AuxFilePolicy>,
#[cfg(feature = "testing")]
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "humantime_serde")]
#[serde(default)]
pub lsn_lease_length: Option<Duration>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "humantime_serde")]
#[serde(default)]
pub lsn_lease_length_for_ts: Option<Duration>,
pub test_vm_bit_debug_logging: Option<bool>,
}
impl TenantConfOpt {
@@ -559,12 +546,10 @@ impl TenantConfOpt {
switch_aux_file_policy: self
.switch_aux_file_policy
.unwrap_or(global_conf.switch_aux_file_policy),
lsn_lease_length: self
.lsn_lease_length
.unwrap_or(global_conf.lsn_lease_length),
lsn_lease_length_for_ts: self
.lsn_lease_length_for_ts
.unwrap_or(global_conf.lsn_lease_length_for_ts),
#[cfg(feature = "testing")]
test_vm_bit_debug_logging: self
.test_vm_bit_debug_logging
.unwrap_or(global_conf.test_vm_bit_debug_logging),
}
}
}
@@ -609,8 +594,8 @@ impl Default for TenantConf {
timeline_get_throttle: crate::tenant::throttle::Config::disabled(),
image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD,
switch_aux_file_policy: AuxFilePolicy::default_tenant_config(),
lsn_lease_length: LsnLease::DEFAULT_LENGTH,
lsn_lease_length_for_ts: LsnLease::DEFAULT_LENGTH_FOR_TS,
#[cfg(feature = "testing")]
test_vm_bit_debug_logging: false,
}
}
}
@@ -686,8 +671,8 @@ impl From<TenantConfOpt> for models::TenantConfig {
timeline_get_throttle: value.timeline_get_throttle.map(ThrottleConfig::from),
image_layer_creation_check_threshold: value.image_layer_creation_check_threshold,
switch_aux_file_policy: value.switch_aux_file_policy,
lsn_lease_length: value.lsn_lease_length.map(humantime),
lsn_lease_length_for_ts: value.lsn_lease_length_for_ts.map(humantime),
#[cfg(feature = "testing")]
test_vm_bit_debug_logging: value.test_vm_bit_debug_logging,
}
}
}

View File

@@ -22,7 +22,7 @@ use async_stream::try_stream;
use byteorder::{ReadBytesExt, BE};
use bytes::{BufMut, Bytes, BytesMut};
use either::Either;
use futures::{Stream, StreamExt};
use futures::Stream;
use hex;
use std::{
cmp::Ordering,
@@ -259,16 +259,6 @@ where
Ok(result)
}
pub fn iter<'a>(
&'a self,
start_key: &'a [u8; L],
ctx: &'a RequestContext,
) -> DiskBtreeIterator<'a> {
DiskBtreeIterator {
stream: Box::pin(self.get_stream_from(start_key, ctx)),
}
}
/// Return a stream which yields all key, value pairs from the index
/// starting from the first key greater or equal to `start_key`.
///
@@ -506,19 +496,6 @@ where
}
}
pub struct DiskBtreeIterator<'a> {
#[allow(clippy::type_complexity)]
stream: std::pin::Pin<
Box<dyn Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a>,
>,
}
impl<'a> DiskBtreeIterator<'a> {
pub async fn next(&mut self) -> Option<std::result::Result<(Vec<u8>, u64), DiskBtreeError>> {
self.stream.next().await
}
}
///
/// Public builder object, for creating a new tree.
///
@@ -1111,17 +1088,6 @@ pub(crate) mod tests {
== all_data.get(&u128::MAX).cloned()
);
// Test iterator and get_stream API
let mut iter = reader.iter(&[0; 16], &ctx);
let mut cnt = 0;
while let Some(res) = iter.next().await {
let (key, val) = res?;
let key = u128::from_be_bytes(key.as_slice().try_into().unwrap());
assert_eq!(val, *all_data.get(&key).unwrap());
cnt += 1;
}
assert_eq!(cnt, all_data.len());
Ok(())
}

View File

@@ -513,7 +513,7 @@ impl<'a> TenantDownloader<'a> {
// cover our access to local storage.
let Ok(_guard) = self.secondary_state.gate.enter() else {
// Shutting down
return Err(UpdateError::Cancelled);
return Ok(());
};
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
@@ -846,7 +846,7 @@ impl<'a> TenantDownloader<'a> {
for layer in timeline.layers {
if self.secondary_state.cancel.is_cancelled() {
tracing::debug!("Cancelled -- dropping out of layer loop");
return Err(UpdateError::Cancelled);
return Ok(());
}
// Existing on-disk layers: just update their access time.

View File

@@ -3,6 +3,7 @@ use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use anyhow::{bail, Context};
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
@@ -10,7 +11,7 @@ use tokio_util::sync::CancellationToken;
use crate::context::RequestContext;
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
use super::{GcError, LogicalSizeCalculationCause, Tenant};
use super::{LogicalSizeCalculationCause, Tenant};
use crate::tenant::Timeline;
use utils::id::TimelineId;
use utils::lsn::Lsn;
@@ -42,40 +43,6 @@ pub struct SegmentMeta {
pub kind: LsnKind,
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum CalculateSyntheticSizeError {
/// Something went wrong internally to the calculation of logical size at a particular branch point
#[error("Failed to calculated logical size on timeline {timeline_id} at {lsn}: {error}")]
LogicalSize {
timeline_id: TimelineId,
lsn: Lsn,
error: CalculateLogicalSizeError,
},
/// Something went wrong internally when calculating GC parameters at start of size calculation
#[error(transparent)]
GcInfo(GcError),
/// Totally unexpected errors, like panics joining a task
#[error(transparent)]
Fatal(anyhow::Error),
/// Tenant shut down while calculating size
#[error("Cancelled")]
Cancelled,
}
impl From<GcError> for CalculateSyntheticSizeError {
fn from(value: GcError) -> Self {
match value {
GcError::TenantCancelled | GcError::TimelineCancelled => {
CalculateSyntheticSizeError::Cancelled
}
other => CalculateSyntheticSizeError::GcInfo(other),
}
}
}
impl SegmentMeta {
fn size_needed(&self) -> bool {
match self.kind {
@@ -149,9 +116,12 @@ pub(super) async fn gather_inputs(
cause: LogicalSizeCalculationCause,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<ModelInputs, CalculateSyntheticSizeError> {
) -> anyhow::Result<ModelInputs> {
// refresh is needed to update gc related pitr_cutoff and horizon_cutoff
tenant.refresh_gc_info(cancel, ctx).await?;
tenant
.refresh_gc_info(cancel, ctx)
.await
.context("Failed to refresh gc_info before gathering inputs")?;
// Collect information about all the timelines
let mut timelines = tenant.list_timelines();
@@ -357,12 +327,6 @@ pub(super) async fn gather_inputs(
)
.await?;
if tenant.cancel.is_cancelled() {
// If we're shutting down, return an error rather than a sparse result that might include some
// timelines from before we started shutting down
return Err(CalculateSyntheticSizeError::Cancelled);
}
Ok(ModelInputs {
segments,
timeline_inputs,
@@ -371,8 +335,9 @@ pub(super) async fn gather_inputs(
/// Augment 'segments' with logical sizes
///
/// This will leave segments' sizes as None if the Timeline associated with the segment is deleted concurrently
/// (i.e. we cannot read its logical size at a particular LSN).
/// this will probably conflict with on-demand downloaded layers, or at least force them all
/// to be downloaded
///
async fn fill_logical_sizes(
timelines: &[Arc<Timeline>],
segments: &mut [SegmentMeta],
@@ -380,7 +345,7 @@ async fn fill_logical_sizes(
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
cause: LogicalSizeCalculationCause,
ctx: &RequestContext,
) -> Result<(), CalculateSyntheticSizeError> {
) -> anyhow::Result<()> {
let timeline_hash: HashMap<TimelineId, Arc<Timeline>> = HashMap::from_iter(
timelines
.iter()
@@ -422,7 +387,7 @@ async fn fill_logical_sizes(
}
// Perform the size lookups
let mut have_any_error = None;
let mut have_any_error = false;
while let Some(res) = joinset.join_next().await {
// each of these come with Result<anyhow::Result<_>, JoinError>
// because of spawn + spawn_blocking
@@ -433,36 +398,21 @@ async fn fill_logical_sizes(
Err(join_error) => {
// cannot really do anything, as this panic is likely a bug
error!("task that calls spawn_ondemand_logical_size_calculation panicked: {join_error:#}");
have_any_error = Some(CalculateSyntheticSizeError::Fatal(
anyhow::anyhow!(join_error)
.context("task that calls spawn_ondemand_logical_size_calculation"),
));
have_any_error = true;
}
Ok(Err(recv_result_error)) => {
// cannot really do anything, as this panic is likely a bug
error!("failed to receive logical size query result: {recv_result_error:#}");
have_any_error = Some(CalculateSyntheticSizeError::Fatal(
anyhow::anyhow!(recv_result_error)
.context("Receiving logical size query result"),
));
have_any_error = true;
}
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => {
if matches!(error, CalculateLogicalSizeError::Cancelled) {
// Skip this: it's okay if one timeline among many is shutting down while we
// calculate inputs for the overall tenant.
continue;
} else {
if !matches!(error, CalculateLogicalSizeError::Cancelled) {
warn!(
timeline_id=%timeline.timeline_id,
"failed to calculate logical size at {lsn}: {error:#}"
);
have_any_error = Some(CalculateSyntheticSizeError::LogicalSize {
timeline_id: timeline.timeline_id,
lsn,
error,
});
}
have_any_error = true;
}
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => {
debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated");
@@ -476,10 +426,10 @@ async fn fill_logical_sizes(
// prune any keys not needed anymore; we record every used key and added key.
logical_size_cache.retain(|key, _| sizes_needed.contains_key(key));
if let Some(error) = have_any_error {
if have_any_error {
// we cannot complete this round, because we are missing data.
// we have however cached all we were able to request calculation on.
return Err(error);
anyhow::bail!("failed to calculate some logical_sizes");
}
// Insert the looked up sizes to the Segments
@@ -493,28 +443,33 @@ async fn fill_logical_sizes(
if let Some(Some(size)) = sizes_needed.get(&(timeline_id, lsn)) {
seg.segment.size = Some(*size);
} else {
bail!("could not find size at {} in timeline {}", lsn, timeline_id);
}
}
Ok(())
}
impl ModelInputs {
pub fn calculate_model(&self) -> tenant_size_model::StorageModel {
pub fn calculate_model(&self) -> anyhow::Result<tenant_size_model::StorageModel> {
// Convert SegmentMetas into plain Segments
StorageModel {
let storage = StorageModel {
segments: self
.segments
.iter()
.map(|seg| seg.segment.clone())
.collect(),
}
};
Ok(storage)
}
// calculate total project size
pub fn calculate(&self) -> u64 {
let storage = self.calculate_model();
pub fn calculate(&self) -> anyhow::Result<u64> {
let storage = self.calculate_model()?;
let sizes = storage.calculate();
sizes.total_size
Ok(sizes.total_size)
}
}
@@ -701,7 +656,7 @@ fn verify_size_for_multiple_branches() {
"#;
let inputs: ModelInputs = serde_json::from_str(doc).unwrap();
assert_eq!(inputs.calculate(), 37_851_408);
assert_eq!(inputs.calculate().unwrap(), 37_851_408);
}
#[test]
@@ -756,7 +711,7 @@ fn verify_size_for_one_branch() {
let model: ModelInputs = serde_json::from_str(doc).unwrap();
let res = model.calculate_model().calculate();
let res = model.calculate_model().unwrap().calculate();
println!("calculated synthetic size: {}", res.total_size);
println!("result: {:?}", serde_json::to_string(&res.segments));

View File

@@ -318,7 +318,7 @@ pub(crate) struct LayerFringe {
#[derive(Debug)]
struct LayerKeyspace {
layer: ReadableLayer,
target_keyspace: KeySpaceRandomAccum,
target_keyspace: Vec<KeySpace>,
}
impl LayerFringe {
@@ -342,13 +342,17 @@ impl LayerFringe {
_,
LayerKeyspace {
layer,
mut target_keyspace,
target_keyspace,
},
)) => Some((
layer,
target_keyspace.consume_keyspace(),
read_desc.lsn_range,
)),
)) => {
let mut keyspace = KeySpaceRandomAccum::new();
for ks in target_keyspace {
for part in ks.ranges {
keyspace.add_range(part);
}
}
Some((layer, keyspace.consume_keyspace(), read_desc.lsn_range))
}
None => unreachable!("fringe internals are always consistent"),
}
}
@@ -363,18 +367,16 @@ impl LayerFringe {
let entry = self.layers.entry(layer_id.clone());
match entry {
Entry::Occupied(mut entry) => {
entry.get_mut().target_keyspace.add_keyspace(keyspace);
entry.get_mut().target_keyspace.push(keyspace);
}
Entry::Vacant(entry) => {
self.planned_reads_by_lsn.push(ReadDesc {
lsn_range,
layer_id: layer_id.clone(),
});
let mut accum = KeySpaceRandomAccum::new();
accum.add_keyspace(keyspace);
entry.insert(LayerKeyspace {
layer,
target_keyspace: accum,
target_keyspace: vec![keyspace],
});
}
}

View File

@@ -219,6 +219,7 @@ pub struct DeltaLayerInner {
// values copied from summary
index_start_blk: u32,
index_root_blk: u32,
lsn_range: Range<Lsn>,
file: VirtualFile,
file_id: FileId,
@@ -784,6 +785,7 @@ impl DeltaLayerInner {
file_id,
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
lsn_range: actual_summary.lsn_range,
max_vectored_read_bytes,
}))
}
@@ -909,7 +911,7 @@ impl DeltaLayerInner {
let reads = Self::plan_reads(
&keyspace,
lsn_range.clone(),
lsn_range,
data_end_offset,
index_reader,
planner,
@@ -922,7 +924,7 @@ impl DeltaLayerInner {
self.do_reads_and_update_state(reads, reconstruct_state, ctx)
.await;
reconstruct_state.on_lsn_advanced(&keyspace, lsn_range.start);
reconstruct_state.on_lsn_advanced(&keyspace, self.lsn_range.start);
Ok(())
}

View File

@@ -346,7 +346,6 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
// cutoff specified as time.
let ctx =
RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let mut first = true;
loop {
tokio::select! {
@@ -363,14 +362,6 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
if first {
first = false;
if delay_by_lease_length(tenant.get_lsn_lease_length(), &cancel)
.await
.is_err()
{
break;
}
if random_init_delay(period, &cancel).await.is_err() {
break;
}
@@ -540,21 +531,6 @@ pub(crate) async fn random_init_delay(
}
}
/// Delays GC by defaul lease length at restart.
///
/// We do this as the leases mapping are not persisted to disk. By delaying GC by default
/// length, we gurantees that all the leases we granted before the restart will expire
/// when we run GC for the first time after the restart.
pub(crate) async fn delay_by_lease_length(
length: Duration,
cancel: &CancellationToken,
) -> Result<(), Cancelled> {
match tokio::time::timeout(length, cancel.cancelled()).await {
Ok(_) => Err(Cancelled),
Err(_) => Ok(()),
}
}
/// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric.
pub(crate) fn warn_when_period_overrun(
elapsed: Duration,

View File

@@ -47,6 +47,7 @@ use utils::{
vec_map::VecMap,
};
use std::ops::{Deref, Range};
use std::pin::pin;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::{Arc, Mutex, RwLock, Weak};
@@ -60,12 +61,7 @@ use std::{
cmp::{max, min, Ordering},
ops::ControlFlow,
};
use std::{
collections::btree_map::Entry,
ops::{Deref, Range},
};
use crate::metrics::GetKind;
use crate::pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS;
use crate::{
aux_file::AuxFileSizeEstimator,
@@ -79,6 +75,7 @@ use crate::{
disk_usage_eviction_task::DiskUsageEvictionInfo,
pgdatadir_mapping::CollectKeySpaceError,
};
use crate::{deletion_queue::DeletionQueueClient, metrics::GetKind};
use crate::{
disk_usage_eviction_task::finite_f32,
tenant::storage_layer::{
@@ -208,6 +205,7 @@ fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
/// The outward-facing resources required to build a Timeline
pub struct TimelineResources {
pub remote_client: RemoteTimelineClient,
pub deletion_queue_client: DeletionQueueClient,
pub timeline_get_throttle: Arc<
crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
>,
@@ -428,14 +426,6 @@ pub struct Timeline {
/// Indicate whether aux file v2 storage is enabled.
pub(crate) last_aux_file_policy: AtomicAuxFilePolicy,
/// Some test cases directly place keys into the timeline without actually modifying the directory
/// keys (i.e., DB_DIR). The test cases creating such keys will put the keyspaces here, so that
/// these keys won't get garbage-collected during compaction/GC. This field only modifies the dense
/// keyspace return value of `collect_keyspace`. For sparse keyspaces, use AUX keys for testing, and
/// in the future, add `extra_test_sparse_keyspace` if necessary.
#[cfg(test)]
pub(crate) extra_test_dense_keyspace: ArcSwap<KeySpace>,
}
pub struct WalReceiverInfo {
@@ -457,9 +447,6 @@ pub(crate) struct GcInfo {
/// The cutoff coordinates, which are combined by selecting the minimum.
pub(crate) cutoffs: GcCutoffs,
/// Leases granted to particular LSNs.
pub(crate) leases: BTreeMap<Lsn, LsnLease>,
}
impl GcInfo {
@@ -927,16 +914,59 @@ impl Timeline {
let mut reconstruct_state = ValuesReconstructState::new();
// Only add the cached image to the reconstruct state when it exists.
let cached_page_img_lsn = cached_page_img.as_ref().map(|(lsn, _)| *lsn);
if cached_page_img.is_some() {
let mut key_state = VectoredValueReconstructState::default();
key_state.img = cached_page_img;
reconstruct_state.keys.insert(key, Ok(key_state));
}
let debug_log = {
#[cfg(feature = "testing")]
{
self.get_test_vm_bit_debug_logging()
}
#[cfg(not(feature = "testing"))]
{
false
}
};
if debug_log {
tracing::info!(%key, %lsn, ?cached_page_img_lsn, "debug-logging page reconstruction");
}
if debug_log {
tracing::info!(
location = "before vectored get",
"debug-logging page reconstruction"
);
self.layers
.read()
.await
.layer_map()
.dump(false, ctx)
.await
.unwrap();
}
let vectored_res = self
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
.await;
if debug_log {
tracing::info!(
location = "before validation",
"debug-logging page reconstruction"
);
self.layers
.read()
.await
.layer_map()
.dump(false, ctx)
.await
.unwrap();
}
if self.conf.validate_vectored_get {
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
.await;
@@ -1561,46 +1591,17 @@ impl Timeline {
Ok(())
}
/// Obtains a temporary lease blocking garbage collection for the given LSN.
///
/// This function will error if the requesting LSN is less than the `latest_gc_cutoff_lsn` and there is also
/// no existing lease to renew. If there is an existing lease in the map, the lease will be renewed only if
/// the request extends the lease. The returned lease is therefore the maximum between the existing lease and
/// the requesting lease.
/// Obtains a temporary lease blocking garbage collection for the given LSN
pub(crate) fn make_lsn_lease(
&self,
lsn: Lsn,
length: Duration,
_lsn: Lsn,
_ctx: &RequestContext,
) -> anyhow::Result<LsnLease> {
let lease = {
let mut gc_info = self.gc_info.write().unwrap();
let valid_until = SystemTime::now() + length;
let entry = gc_info.leases.entry(lsn);
let lease = {
if let Entry::Occupied(mut occupied) = entry {
let existing_lease = occupied.get_mut();
if valid_until > existing_lease.valid_until {
existing_lease.valid_until = valid_until;
}
existing_lease.clone()
} else {
// Reject already GC-ed LSN (lsn < latest_gc_cutoff)
let latest_gc_cutoff_lsn = self.get_latest_gc_cutoff_lsn();
if lsn < *latest_gc_cutoff_lsn {
bail!("tried to request a page version that was garbage collected. requested at {} gc cutoff {}", lsn, *latest_gc_cutoff_lsn);
}
entry.or_insert(LsnLease { valid_until }).clone()
}
};
lease
const LEASE_LENGTH: Duration = Duration::from_secs(5 * 60);
let lease = LsnLease {
valid_until: SystemTime::now() + LEASE_LENGTH,
};
// TODO: dummy implementation
Ok(lease)
}
@@ -2117,24 +2118,6 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
// Private functions
impl Timeline {
pub(crate) fn get_lsn_lease_length(&self) -> Duration {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.lsn_lease_length
.unwrap_or(self.conf.default_tenant_conf.lsn_lease_length)
}
// TODO(yuchen): remove unused flag after implementing https://github.com/neondatabase/neon/issues/8072
#[allow(unused)]
pub(crate) fn get_lsn_lease_length_for_ts(&self) -> Duration {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.lsn_lease_length_for_ts
.unwrap_or(self.conf.default_tenant_conf.lsn_lease_length_for_ts)
}
pub(crate) fn get_switch_aux_file_policy(&self) -> AuxFilePolicy {
let tenant_conf = self.tenant_conf.load();
tenant_conf
@@ -2218,6 +2201,15 @@ impl Timeline {
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
}
#[cfg(feature = "testing")]
fn get_test_vm_bit_debug_logging(&self) -> bool {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.test_vm_bit_debug_logging
.unwrap_or(self.conf.default_tenant_conf.test_vm_bit_debug_logging)
}
fn get_image_layer_creation_check_threshold(&self) -> u8 {
let tenant_conf = self.tenant_conf.load();
tenant_conf
@@ -2233,6 +2225,10 @@ impl Timeline {
pub(super) fn tenant_conf_updated(&self, new_conf: &TenantConfOpt) {
// NB: Most tenant conf options are read by background loops, so,
// changes will automatically be picked up.
#[cfg(feature = "testing")]
{
info!(?new_conf.test_vm_bit_debug_logging, "updating tenant conf");
}
// The threshold is embedded in the metric. So, we need to update it.
{
@@ -2404,9 +2400,6 @@ impl Timeline {
aux_file_size_estimator: AuxFileSizeEstimator::new(aux_file_metrics),
last_aux_file_policy: AtomicAuxFilePolicy::new(aux_file_policy),
#[cfg(test)]
extra_test_dense_keyspace: ArcSwap::new(Arc::new(KeySpace::default())),
};
result.repartition_threshold =
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
@@ -4875,7 +4868,7 @@ impl Timeline {
pitr: Duration,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<GcCutoffs, PageReconstructError> {
) -> anyhow::Result<GcCutoffs> {
let _timer = self
.metrics
.find_gc_cutoffs_histo
@@ -4960,25 +4953,13 @@ impl Timeline {
return Err(GcError::TimelineCancelled);
}
let (horizon_cutoff, pitr_cutoff, retain_lsns, max_lsn_with_valid_lease) = {
let (horizon_cutoff, pitr_cutoff, retain_lsns) = {
let gc_info = self.gc_info.read().unwrap();
let horizon_cutoff = min(gc_info.cutoffs.horizon, self.get_disk_consistent_lsn());
let pitr_cutoff = gc_info.cutoffs.pitr;
let retain_lsns = gc_info.retain_lsns.clone();
// Gets the maximum LSN that holds the valid lease.
//
// Caveat: `refresh_gc_info` is in charged of updating the lease map.
// Here, we do not check for stale leases again.
let max_lsn_with_valid_lease = gc_info.leases.last_key_value().map(|(lsn, _)| *lsn);
(
horizon_cutoff,
pitr_cutoff,
retain_lsns,
max_lsn_with_valid_lease,
)
(horizon_cutoff, pitr_cutoff, retain_lsns)
};
let mut new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
@@ -5009,13 +4990,7 @@ impl Timeline {
.set(Lsn::INVALID.0 as i64);
let res = self
.gc_timeline(
horizon_cutoff,
pitr_cutoff,
retain_lsns,
max_lsn_with_valid_lease,
new_gc_cutoff,
)
.gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff)
.instrument(
info_span!("gc_timeline", timeline_id = %self.timeline_id, cutoff = %new_gc_cutoff),
)
@@ -5032,7 +5007,6 @@ impl Timeline {
horizon_cutoff: Lsn,
pitr_cutoff: Lsn,
retain_lsns: Vec<Lsn>,
max_lsn_with_valid_lease: Option<Lsn>,
new_gc_cutoff: Lsn,
) -> Result<GcResult, GcError> {
// FIXME: if there is an ongoing detach_from_ancestor, we should just skip gc
@@ -5081,8 +5055,7 @@ impl Timeline {
// 1. it is older than cutoff LSN;
// 2. it is older than PITR interval;
// 3. it doesn't need to be retained for 'retain_lsns';
// 4. it does not need to be kept for LSNs holding valid leases.
// 5. newer on-disk image layers cover the layer's whole key range
// 4. newer on-disk image layers cover the layer's whole key range
//
// TODO holding a write lock is too agressive and avoidable
let mut guard = self.layers.write().await;
@@ -5133,21 +5106,7 @@ impl Timeline {
}
}
// 4. Is there a valid lease that requires us to keep this layer?
if let Some(lsn) = &max_lsn_with_valid_lease {
// keep if layer start <= any of the lease
if &l.get_lsn_range().start <= lsn {
debug!(
"keeping {} because there is a valid lease preventing GC at {}",
l.layer_name(),
lsn,
);
result.layers_needed_by_leases += 1;
continue 'outer;
}
}
// 5. Is there a later on-disk layer for this relation?
// 4. Is there a later on-disk layer for this relation?
//
// The end-LSN is exclusive, while disk_consistent_lsn is
// inclusive. For example, if disk_consistent_lsn is 100, it is
@@ -5525,11 +5484,6 @@ impl Timeline {
self.last_record_lsn.advance(new_lsn);
}
#[cfg(test)]
pub(super) fn force_set_disk_consistent_lsn(&self, new_value: Lsn) {
self.disk_consistent_lsn.store(new_value);
}
/// Force create an image layer and place it into the layer map.
///
/// DO NOT use this function directly. Use [`Tenant::branch_timeline_test_with_layers`]
@@ -5664,13 +5618,6 @@ impl Timeline {
}
Ok(layers)
}
#[cfg(test)]
pub(crate) fn add_extra_test_dense_keyspace(&self, ks: KeySpace) {
let mut keyspace = self.extra_test_dense_keyspace.load().as_ref().clone();
keyspace.merge(&ks);
self.extra_test_dense_keyspace.store(Arc::new(keyspace));
}
}
type TraversalPathItem = (ValueReconstructResult, Lsn, TraversalId);

View File

@@ -11,6 +11,7 @@ use utils::{crashsafe, fs_ext, id::TimelineId, pausable_failpoint};
use crate::{
config::PageServerConf,
deletion_queue::DeletionQueueClient,
task_mgr::{self, TaskKind},
tenant::{
metadata::TimelineMetadata,
@@ -262,6 +263,7 @@ impl DeleteTimelineFlow {
timeline_id: TimelineId,
local_metadata: &TimelineMetadata,
remote_client: RemoteTimelineClient,
deletion_queue_client: DeletionQueueClient,
) -> anyhow::Result<()> {
// Note: here we even skip populating layer map. Timeline is essentially uninitialized.
// RemoteTimelineClient is the only functioning part.
@@ -272,6 +274,7 @@ impl DeleteTimelineFlow {
None, // Ancestor is not needed for deletion.
TimelineResources {
remote_client,
deletion_queue_client,
timeline_get_throttle: tenant.timeline_get_throttle.clone(),
},
// Important. We dont pass ancestor above because it can be missing.

View File

@@ -49,19 +49,6 @@ pub enum NeonWalRecord {
file_path: String,
content: Option<Bytes>,
},
/// A testing record for unit testing purposes. It supports append data to an existing image, or clear it.
#[cfg(test)]
Test {
/// Append a string to the image.
append: String,
/// Clear the image before appending.
clear: bool,
/// Treat this record as an init record. `clear` should be set to true if this field is set
/// to true. This record does not need the history WALs to reconstruct. See [`NeonWalRecord::will_init`] and
/// its references in `timeline.rs`.
will_init: bool,
},
}
impl NeonWalRecord {
@@ -71,39 +58,11 @@ impl NeonWalRecord {
// If you change this function, you'll also need to change ValueBytes::will_init
match self {
NeonWalRecord::Postgres { will_init, rec: _ } => *will_init,
#[cfg(test)]
NeonWalRecord::Test { will_init, .. } => *will_init,
// None of the special neon record types currently initialize the page
_ => false,
}
}
#[cfg(test)]
pub(crate) fn wal_append(s: impl AsRef<str>) -> Self {
Self::Test {
append: s.as_ref().to_string(),
clear: false,
will_init: false,
}
}
#[cfg(test)]
pub(crate) fn wal_clear() -> Self {
Self::Test {
append: "".to_string(),
clear: true,
will_init: false,
}
}
#[cfg(test)]
pub(crate) fn wal_init() -> Self {
Self::Test {
append: "".to_string(),
clear: true,
will_init: true,
}
}
}
/// DecodedBkpBlock represents per-page data contained in a WAL record.

View File

@@ -244,20 +244,6 @@ pub(crate) fn apply_in_neon(
let mut writer = page.writer();
dir.ser_into(&mut writer)?;
}
#[cfg(test)]
NeonWalRecord::Test {
append,
clear,
will_init,
} => {
if *will_init {
assert!(*clear, "init record must be clear to ensure correctness");
}
if *clear {
page.clear();
}
page.put_slice(append.as_bytes());
}
}
Ok(())
}

View File

@@ -1,8 +1,19 @@
From 0b0194a57bd0f3598bd57dbedd0df3932330169d Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Fri, 2 Feb 2024 22:26:45 +0200
Subject: [PATCH 1/1] Make v0.6.0 work with Neon
Now that the WAL-logging happens as a separate step at the end of the
build, we need a few neon-specific hints to make it work.
---
src/hnswbuild.c | 36 ++++++++++++++++++++++++++++++++++++
1 file changed, 36 insertions(+)
diff --git a/src/hnswbuild.c b/src/hnswbuild.c
index dcfb2bd..d5189ee 100644
index 680789b..ec54dea 100644
--- a/src/hnswbuild.c
+++ b/src/hnswbuild.c
@@ -860,9 +860,17 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc)
@@ -840,9 +840,17 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc)
hnswarea = shm_toc_lookup(toc, PARALLEL_KEY_HNSW_AREA, false);
@@ -20,7 +31,7 @@ index dcfb2bd..d5189ee 100644
/* Close relations within worker */
index_close(indexRel, indexLockmode);
table_close(heapRel, heapLockmode);
@@ -1117,12 +1125,38 @@ BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo,
@@ -1089,13 +1097,41 @@ BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo,
SeedRandom(42);
#endif
@@ -32,13 +43,14 @@ index dcfb2bd..d5189ee 100644
BuildGraph(buildstate, forkNum);
- if (RelationNeedsWAL(index) || forkNum == INIT_FORKNUM)
+#ifdef NEON_SMGR
+ smgr_finish_unlogged_build_phase_1(RelationGetSmgr(index));
+#endif
+
+ if (RelationNeedsWAL(index) || forkNum == INIT_FORKNUM) {
log_newpage_range(index, forkNum, 0, RelationGetNumberOfBlocksInFork(index, forkNum), true);
if (RelationNeedsWAL(index))
+ {
log_newpage_range(index, forkNum, 0, RelationGetNumberOfBlocks(index), true);
+#ifdef NEON_SMGR
+ {
+#if PG_VERSION_NUM >= 160000
@@ -48,7 +60,7 @@ index dcfb2bd..d5189ee 100644
+#endif
+
+ SetLastWrittenLSNForBlockRange(XactLastRecEnd, rlocator,
+ MAIN_FORKNUM, 0, RelationGetNumberOfBlocks(index));
+ MAIN_FORKNUM, 0, RelationGetNumberOfBlocks(index));
+ SetLastWrittenLSNForRelation(XactLastRecEnd, rlocator, MAIN_FORKNUM);
+ }
+#endif
@@ -57,6 +69,10 @@ index dcfb2bd..d5189ee 100644
+#ifdef NEON_SMGR
+ smgr_end_unlogged_build(RelationGetSmgr(index));
+#endif
+
FreeBuildState(buildstate);
}
--
2.39.2

View File

@@ -381,15 +381,6 @@ pageserver_connect(shardno_t shard_no, int elevel)
us_since_last_attempt = (int64) (now - shard->last_reconnect_time);
shard->last_reconnect_time = now;
/*
* Make sure we don't do exponential backoff with a constant multiplier
* of 0 us, as that doesn't really do much for timeouts...
*
* cf. https://github.com/neondatabase/neon/issues/7897
*/
if (shard->delay_us == 0)
shard->delay_us = MIN_RECONNECT_INTERVAL_USEC;
/*
* If we did other tasks between reconnect attempts, then we won't
* need to wait as long as a full delay.

View File

@@ -3112,12 +3112,12 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
request_lsn = UINT64_MAX;
/*
* GetRedoStartLsn() returns LSN of the basebackup. We know that the SLRU
* GetRedoStartLsn() returns LSN of basebackup. We know that the SLRU
* segment has not changed since the basebackup, because in order to
* modify it, we would have had to download it already. And once
* downloaded, we never evict SLRU segments from local disk.
*/
not_modified_since = nm_adjust_lsn(GetRedoStartLsn());
not_modified_since = GetRedoStartLsn();
SlruKind kind;

View File

@@ -100,12 +100,17 @@ static void StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd);
static void WalSndLoop(WalProposer *wp);
static void XLogBroadcastWalProposer(WalProposer *wp);
static void XLogWalPropWrite(WalProposer *wp, char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalPropClose(XLogRecPtr recptr);
static void add_nwr_event_set(Safekeeper *sk, uint32 events);
static void update_nwr_event_set(Safekeeper *sk, uint32 events);
static void rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk);
static void CheckGracefulShutdown(WalProposer *wp);
static XLogRecPtr GetLogRepRestartLSN(WalProposer *wp);
static void
init_walprop_config(bool syncSafekeepers)
{
@@ -1231,6 +1236,8 @@ StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd)
static void
WalSndLoop(WalProposer *wp)
{
XLogRecPtr flushPtr;
/* Clear any already-pending wakeups */
ResetLatch(MyLatch);
@@ -1326,9 +1333,8 @@ XLogBroadcastWalProposer(WalProposer *wp)
}
/*
Used to download WAL before basebackup for walproposer/logical walsenders. No
longer used, replaced by neon_walreader; but callback still exists because
simulation tests use it.
Used to download WAL before basebackup for logical walsenders from sk, no longer
needed because walsender always uses neon_walreader.
*/
static bool
WalProposerRecovery(WalProposer *wp, Safekeeper *sk)
@@ -1336,6 +1342,136 @@ WalProposerRecovery(WalProposer *wp, Safekeeper *sk)
return true;
}
/*
* These variables are used similarly to openLogFile/SegNo,
* but for walproposer to write the XLOG during recovery. walpropFileTLI is the TimeLineID
* corresponding the filename of walpropFile.
*/
static int walpropFile = -1;
static TimeLineID walpropFileTLI = 0;
static XLogSegNo walpropSegNo = 0;
/*
* Write XLOG data to disk.
*/
static void
XLogWalPropWrite(WalProposer *wp, char *buf, Size nbytes, XLogRecPtr recptr)
{
int startoff;
int byteswritten;
/*
* Apart from walproposer, basebackup LSN page is also written out by
* postgres itself which writes WAL only in pages, and in basebackup it is
* inherently dummy (only safekeepers have historic WAL). Update WAL
* buffers here to avoid dummy page overwriting correct one we download
* here. Ugly, but alternatives are about the same ugly. We won't need
* that if we switch to on-demand WAL download from safekeepers, without
* writing to disk.
*
* https://github.com/neondatabase/neon/issues/5749
*/
if (!wp->config->syncSafekeepers)
XLogUpdateWalBuffers(buf, recptr, nbytes);
while (nbytes > 0)
{
int segbytes;
/* Close the current segment if it's completed */
if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size))
XLogWalPropClose(recptr);
if (walpropFile < 0)
{
#if PG_VERSION_NUM >= 150000
/* FIXME Is it ok to use hardcoded value here? */
TimeLineID tli = 1;
#else
bool use_existent = true;
#endif
/* Create/use new log file */
XLByteToSeg(recptr, walpropSegNo, wal_segment_size);
#if PG_VERSION_NUM >= 150000
walpropFile = XLogFileInit(walpropSegNo, tli);
walpropFileTLI = tli;
#else
walpropFile = XLogFileInit(walpropSegNo, &use_existent, false);
walpropFileTLI = ThisTimeLineID;
#endif
}
/* Calculate the start offset of the received logs */
startoff = XLogSegmentOffset(recptr, wal_segment_size);
if (startoff + nbytes > wal_segment_size)
segbytes = wal_segment_size - startoff;
else
segbytes = nbytes;
/* OK to write the logs */
errno = 0;
byteswritten = pg_pwrite(walpropFile, buf, segbytes, (off_t) startoff);
if (byteswritten <= 0)
{
char xlogfname[MAXFNAMELEN];
int save_errno;
/* if write didn't set errno, assume no disk space */
if (errno == 0)
errno = ENOSPC;
save_errno = errno;
XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size);
errno = save_errno;
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not write to log segment %s "
"at offset %u, length %lu: %m",
xlogfname, startoff, (unsigned long) segbytes)));
}
/* Update state for write */
recptr += byteswritten;
nbytes -= byteswritten;
buf += byteswritten;
}
/*
* Close the current segment if it's fully written up in the last cycle of
* the loop.
*/
if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size))
{
XLogWalPropClose(recptr);
}
}
/*
* Close the current segment.
*/
static void
XLogWalPropClose(XLogRecPtr recptr)
{
Assert(walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size));
if (close(walpropFile) != 0)
{
char xlogfname[MAXFNAMELEN];
XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size);
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not close log segment %s: %m",
xlogfname)));
}
walpropFile = -1;
}
static void
walprop_pg_wal_reader_allocate(Safekeeper *sk)
{
@@ -1851,6 +1987,58 @@ walprop_pg_log_internal(WalProposer *wp, int level, const char *line)
elog(FATAL, "unexpected log_internal message at level %d: %s", level, line);
}
static XLogRecPtr
GetLogRepRestartLSN(WalProposer *wp)
{
FILE *f;
XLogRecPtr lrRestartLsn = InvalidXLogRecPtr;
/* We don't need to do anything in syncSafekeepers mode. */
if (wp->config->syncSafekeepers)
return InvalidXLogRecPtr;
/*
* If there are active logical replication subscription we need to provide
* enough WAL for their WAL senders based on th position of their
* replication slots.
*/
f = fopen("restart.lsn", "rb");
if (f != NULL)
{
size_t rc = fread(&lrRestartLsn, sizeof(lrRestartLsn), 1, f);
fclose(f);
if (rc == 1 && lrRestartLsn != InvalidXLogRecPtr)
{
uint64 download_range_mb;
wpg_log(LOG, "logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn));
/*
* If we need to download more than a max_slot_wal_keep_size,
* don't do it to avoid risk of exploding pg_wal. Logical
* replication won't work until recreated, but at least compute
* would start; this also follows max_slot_wal_keep_size
* semantics.
*/
download_range_mb = (wp->propEpochStartLsn - lrRestartLsn) / MB;
if (max_slot_wal_keep_size_mb > 0 && download_range_mb >= max_slot_wal_keep_size_mb)
{
wpg_log(WARNING, "not downloading WAL for logical replication since %X/%X as max_slot_wal_keep_size=%dMB",
LSN_FORMAT_ARGS(lrRestartLsn), max_slot_wal_keep_size_mb);
return InvalidXLogRecPtr;
}
/*
* start from the beginning of the segment to fetch page headers
* verifed by XLogReader
*/
lrRestartLsn = lrRestartLsn - XLogSegmentOffset(lrRestartLsn, wal_segment_size);
}
}
return lrRestartLsn;
}
void
SetNeonCurrentClusterSize(uint64 size)
{

8
poetry.lock generated
View File

@@ -2806,13 +2806,13 @@ files = [
[[package]]
name = "urllib3"
version = "1.26.19"
version = "1.26.18"
description = "HTTP library with thread-safe connection pooling, file post, and more."
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,>=2.7"
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*"
files = [
{file = "urllib3-1.26.19-py2.py3-none-any.whl", hash = "sha256:37a0344459b199fce0e80b0d3569837ec6b6937435c5244e7fd73fa6006830f3"},
{file = "urllib3-1.26.19.tar.gz", hash = "sha256:3e3d753a8618b86d7de333b4223005f68720bcd6a7d2bcb9fbd2229ec7c1e429"},
{file = "urllib3-1.26.18-py2.py3-none-any.whl", hash = "sha256:34b97092d7e0a3a8cf7cd10e386f401b3737364026c45e622aa02903dffe0f07"},
{file = "urllib3-1.26.18.tar.gz", hash = "sha256:f8ecc1bba5667413457c529ab955bf8c67b45db799d159066261719e328580a0"},
]
[package.extras]

View File

@@ -1,183 +1,16 @@
use measured::FixedCardinalityLabel;
use serde::{Deserialize, Serialize};
use std::fmt::{self, Display};
use std::fmt;
use crate::auth::IpPattern;
use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt};
use crate::proxy::retry::ShouldRetry;
/// Generic error response with human-readable description.
/// Note that we can't always present it to user as is.
#[derive(Debug, Deserialize)]
pub struct ConsoleError {
pub error: Box<str>,
#[serde(skip)]
pub http_status_code: http::StatusCode,
pub status: Option<Status>,
}
impl ConsoleError {
pub fn get_reason(&self) -> Reason {
self.status
.as_ref()
.and_then(|s| s.details.error_info.as_ref())
.map(|e| e.reason)
.unwrap_or(Reason::Unknown)
}
pub fn get_user_facing_message(&self) -> String {
use super::provider::errors::REQUEST_FAILED;
self.status
.as_ref()
.and_then(|s| s.details.user_facing_message.as_ref())
.map(|m| m.message.clone().into())
.unwrap_or_else(|| {
// Ask @neondatabase/control-plane for review before adding more.
match self.http_status_code {
http::StatusCode::NOT_FOUND => {
// Status 404: failed to get a project-related resource.
format!("{REQUEST_FAILED}: endpoint cannot be found")
}
http::StatusCode::NOT_ACCEPTABLE => {
// Status 406: endpoint is disabled (we don't allow connections).
format!("{REQUEST_FAILED}: endpoint is disabled")
}
http::StatusCode::LOCKED | http::StatusCode::UNPROCESSABLE_ENTITY => {
// Status 423: project might be in maintenance mode (or bad state), or quotas exceeded.
format!("{REQUEST_FAILED}: endpoint is temporarily unavailable. Check your quotas and/or contact our support.")
}
_ => REQUEST_FAILED.to_owned(),
}
})
}
}
impl Display for ConsoleError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let msg = self
.status
.as_ref()
.and_then(|s| s.details.user_facing_message.as_ref())
.map(|m| m.message.as_ref())
.unwrap_or_else(|| &self.error);
write!(f, "{}", msg)
}
}
impl ShouldRetry for ConsoleError {
fn could_retry(&self) -> bool {
if self.status.is_none() || self.status.as_ref().unwrap().details.retry_info.is_none() {
// retry some temporary failures because the compute was in a bad state
// (bad request can be returned when the endpoint was in transition)
return match &self {
ConsoleError {
http_status_code: http::StatusCode::BAD_REQUEST,
..
} => true,
// don't retry when quotas are exceeded
ConsoleError {
http_status_code: http::StatusCode::UNPROCESSABLE_ENTITY,
ref error,
..
} => !error.contains("compute time quota of non-primary branches is exceeded"),
// locked can be returned when the endpoint was in transition
// or when quotas are exceeded. don't retry when quotas are exceeded
ConsoleError {
http_status_code: http::StatusCode::LOCKED,
ref error,
..
} => {
!error.contains("quota exceeded")
&& !error.contains("the limit for current plan reached")
}
_ => false,
};
}
// retry if the response has a retry delay
if let Some(retry_info) = self
.status
.as_ref()
.and_then(|s| s.details.retry_info.as_ref())
{
retry_info.retry_delay_ms > 0
} else {
false
}
}
}
#[derive(Debug, Deserialize)]
pub struct Status {
pub code: Box<str>,
pub message: Box<str>,
pub details: Details,
}
#[derive(Debug, Deserialize)]
pub struct Details {
pub error_info: Option<ErrorInfo>,
pub retry_info: Option<RetryInfo>,
pub user_facing_message: Option<UserFacingMessage>,
}
#[derive(Debug, Deserialize)]
pub struct ErrorInfo {
pub reason: Reason,
// Schema could also have `metadata` field, but it's not structured. Skip it for now.
}
#[derive(Clone, Copy, Debug, Deserialize, Default)]
pub enum Reason {
#[serde(rename = "ROLE_PROTECTED")]
RoleProtected,
#[serde(rename = "RESOURCE_NOT_FOUND")]
ResourceNotFound,
#[serde(rename = "PROJECT_NOT_FOUND")]
ProjectNotFound,
#[serde(rename = "ENDPOINT_NOT_FOUND")]
EndpointNotFound,
#[serde(rename = "BRANCH_NOT_FOUND")]
BranchNotFound,
#[serde(rename = "RATE_LIMIT_EXCEEDED")]
RateLimitExceeded,
#[serde(rename = "NON_PRIMARY_BRANCH_COMPUTE_TIME_EXCEEDED")]
NonPrimaryBranchComputeTimeExceeded,
#[serde(rename = "ACTIVE_TIME_QUOTA_EXCEEDED")]
ActiveTimeQuotaExceeded,
#[serde(rename = "COMPUTE_TIME_QUOTA_EXCEEDED")]
ComputeTimeQuotaExceeded,
#[serde(rename = "WRITTEN_DATA_QUOTA_EXCEEDED")]
WrittenDataQuotaExceeded,
#[serde(rename = "DATA_TRANSFER_QUOTA_EXCEEDED")]
DataTransferQuotaExceeded,
#[serde(rename = "LOGICAL_SIZE_QUOTA_EXCEEDED")]
LogicalSizeQuotaExceeded,
#[default]
#[serde(other)]
Unknown,
}
impl Reason {
pub fn is_not_found(&self) -> bool {
matches!(
self,
Reason::ResourceNotFound
| Reason::ProjectNotFound
| Reason::EndpointNotFound
| Reason::BranchNotFound
)
}
}
#[derive(Debug, Deserialize)]
pub struct RetryInfo {
pub retry_delay_ms: u64,
}
#[derive(Debug, Deserialize)]
pub struct UserFacingMessage {
pub message: Box<str>,
}
/// Response which holds client's auth secret, e.g. [`crate::scram::ServerSecret`].

View File

@@ -25,8 +25,8 @@ use tracing::info;
pub mod errors {
use crate::{
console::messages::{self, ConsoleError},
error::{io_error, ReportableError, UserFacingError},
http,
proxy::retry::ShouldRetry,
};
use thiserror::Error;
@@ -34,14 +34,17 @@ pub mod errors {
use super::ApiLockError;
/// A go-to error message which doesn't leak any detail.
pub const REQUEST_FAILED: &str = "Console request failed";
const REQUEST_FAILED: &str = "Console request failed";
/// Common console API error.
#[derive(Debug, Error)]
pub enum ApiError {
/// Error returned by the console itself.
#[error("{REQUEST_FAILED} with {0}")]
Console(ConsoleError),
#[error("{REQUEST_FAILED} with {}: {}", .status, .text)]
Console {
status: http::StatusCode,
text: Box<str>,
},
/// Various IO errors like broken pipe or malformed payload.
#[error("{REQUEST_FAILED}: {0}")]
@@ -50,11 +53,11 @@ pub mod errors {
impl ApiError {
/// Returns HTTP status code if it's the reason for failure.
pub fn get_reason(&self) -> messages::Reason {
pub fn http_status_code(&self) -> Option<http::StatusCode> {
use ApiError::*;
match self {
Console(e) => e.get_reason(),
_ => messages::Reason::Unknown,
Console { status, .. } => Some(*status),
_ => None,
}
}
}
@@ -64,7 +67,22 @@ pub mod errors {
use ApiError::*;
match self {
// To minimize risks, only select errors are forwarded to users.
Console(c) => c.get_user_facing_message(),
// Ask @neondatabase/control-plane for review before adding more.
Console { status, .. } => match *status {
http::StatusCode::NOT_FOUND => {
// Status 404: failed to get a project-related resource.
format!("{REQUEST_FAILED}: endpoint cannot be found")
}
http::StatusCode::NOT_ACCEPTABLE => {
// Status 406: endpoint is disabled (we don't allow connections).
format!("{REQUEST_FAILED}: endpoint is disabled")
}
http::StatusCode::LOCKED | http::StatusCode::UNPROCESSABLE_ENTITY => {
// Status 423: project might be in maintenance mode (or bad state), or quotas exceeded.
format!("{REQUEST_FAILED}: endpoint is temporarily unavailable. Check your quotas and/or contact our support.")
}
_ => REQUEST_FAILED.to_owned(),
},
_ => REQUEST_FAILED.to_owned(),
}
}
@@ -73,56 +91,29 @@ pub mod errors {
impl ReportableError for ApiError {
fn get_error_kind(&self) -> crate::error::ErrorKind {
match self {
ApiError::Console(e) => {
use crate::error::ErrorKind::*;
match e.get_reason() {
crate::console::messages::Reason::RoleProtected => User,
crate::console::messages::Reason::ResourceNotFound => User,
crate::console::messages::Reason::ProjectNotFound => User,
crate::console::messages::Reason::EndpointNotFound => User,
crate::console::messages::Reason::BranchNotFound => User,
crate::console::messages::Reason::RateLimitExceeded => ServiceRateLimit,
crate::console::messages::Reason::NonPrimaryBranchComputeTimeExceeded => {
User
}
crate::console::messages::Reason::ActiveTimeQuotaExceeded => User,
crate::console::messages::Reason::ComputeTimeQuotaExceeded => User,
crate::console::messages::Reason::WrittenDataQuotaExceeded => User,
crate::console::messages::Reason::DataTransferQuotaExceeded => User,
crate::console::messages::Reason::LogicalSizeQuotaExceeded => User,
crate::console::messages::Reason::Unknown => match &e {
ConsoleError {
http_status_code:
http::StatusCode::NOT_FOUND | http::StatusCode::NOT_ACCEPTABLE,
..
} => crate::error::ErrorKind::User,
ConsoleError {
http_status_code: http::StatusCode::UNPROCESSABLE_ENTITY,
error,
..
} if error.contains(
"compute time quota of non-primary branches is exceeded",
) =>
{
crate::error::ErrorKind::User
}
ConsoleError {
http_status_code: http::StatusCode::LOCKED,
error,
..
} if error.contains("quota exceeded")
|| error.contains("the limit for current plan reached") =>
{
crate::error::ErrorKind::User
}
ConsoleError {
http_status_code: http::StatusCode::TOO_MANY_REQUESTS,
..
} => crate::error::ErrorKind::ServiceRateLimit,
ConsoleError { .. } => crate::error::ErrorKind::ControlPlane,
},
}
ApiError::Console {
status: http::StatusCode::NOT_FOUND | http::StatusCode::NOT_ACCEPTABLE,
..
} => crate::error::ErrorKind::User,
ApiError::Console {
status: http::StatusCode::UNPROCESSABLE_ENTITY,
text,
} if text.contains("compute time quota of non-primary branches is exceeded") => {
crate::error::ErrorKind::User
}
ApiError::Console {
status: http::StatusCode::LOCKED,
text,
} if text.contains("quota exceeded")
|| text.contains("the limit for current plan reached") =>
{
crate::error::ErrorKind::User
}
ApiError::Console {
status: http::StatusCode::TOO_MANY_REQUESTS,
..
} => crate::error::ErrorKind::ServiceRateLimit,
ApiError::Console { .. } => crate::error::ErrorKind::ControlPlane,
ApiError::Transport(_) => crate::error::ErrorKind::ControlPlane,
}
}
@@ -133,7 +124,31 @@ pub mod errors {
match self {
// retry some transport errors
Self::Transport(io) => io.could_retry(),
Self::Console(e) => e.could_retry(),
// retry some temporary failures because the compute was in a bad state
// (bad request can be returned when the endpoint was in transition)
Self::Console {
status: http::StatusCode::BAD_REQUEST,
..
} => true,
// don't retry when quotas are exceeded
Self::Console {
status: http::StatusCode::UNPROCESSABLE_ENTITY,
ref text,
} => !text.contains("compute time quota of non-primary branches is exceeded"),
// locked can be returned when the endpoint was in transition
// or when quotas are exceeded. don't retry when quotas are exceeded
Self::Console {
status: http::StatusCode::LOCKED,
ref text,
} => {
// written data quota exceeded
// data transfer quota exceeded
// compute time quota exceeded
// logical size quota exceeded
!text.contains("quota exceeded")
&& !text.contains("the limit for current plan reached")
}
_ => false,
}
}
}
@@ -494,7 +509,7 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
self.metrics
.semaphore_acquire_seconds
.observe(now.elapsed().as_secs_f64());
info!("acquired permit {:?}", now.elapsed().as_secs_f64());
Ok(WakeComputePermit { permit: permit? })
}

View File

@@ -94,14 +94,12 @@ impl Api {
let body = match parse_body::<GetRoleSecret>(response).await {
Ok(body) => body,
// Error 404 is special: it's ok not to have a secret.
// TODO(anna): retry
Err(e) => {
if e.get_reason().is_not_found() {
Err(e) => match e.http_status_code() {
Some(http::StatusCode::NOT_FOUND) => {
return Ok(AuthInfo::default());
} else {
return Err(e.into());
}
}
_otherwise => return Err(e.into()),
},
};
let secret = if body.role_secret.is_empty() {
@@ -330,24 +328,19 @@ async fn parse_body<T: for<'a> serde::Deserialize<'a>>(
info!("request succeeded, processing the body");
return Ok(response.json().await?);
}
let s = response.bytes().await?;
// Log plaintext to be able to detect, whether there are some cases not covered by the error struct.
info!("response_error plaintext: {:?}", s);
// Don't throw an error here because it's not as important
// as the fact that the request itself has failed.
let mut body = serde_json::from_slice(&s).unwrap_or_else(|e| {
let body = response.json().await.unwrap_or_else(|e| {
warn!("failed to parse error body: {e}");
ConsoleError {
error: "reason unclear (malformed error message)".into(),
http_status_code: status,
status: None,
}
});
body.http_status_code = status;
error!("console responded with an error ({status}): {body:?}");
Err(ApiError::Console(body))
let text = body.error;
error!("console responded with an error ({status}): {text}");
Err(ApiError::Console { status, text })
}
fn parse_host_port(input: &str) -> Option<(&str, u16)> {

View File

@@ -91,7 +91,7 @@ pub async fn task_main(
let endpoint_rate_limiter2 = endpoint_rate_limiter.clone();
connections.spawn(async move {
let (socket, peer_addr) = match read_proxy_protocol(socket).await {
let (socket, peer_addr) = match read_proxy_protocol(socket).await{
Ok((socket, Some(addr))) => (socket, addr.ip()),
Err(e) => {
error!("per-client task finished with an error: {e:#}");
@@ -101,38 +101,36 @@ pub async fn task_main(
error!("missing required client IP");
return;
}
Ok((socket, None)) => (socket, peer_addr.ip()),
Ok((socket, None)) => (socket, peer_addr.ip())
};
match socket.inner.set_nodelay(true) {
Ok(()) => {}
Ok(()) => {},
Err(e) => {
error!("per-client task finished with an error: failed to set socket option: {e:#}");
return;
}
},
};
let mut ctx = RequestMonitoring::new(
session_id,
peer_addr,
crate::metrics::Protocol::Tcp,
&config.region,
);
session_id,
peer_addr,
crate::metrics::Protocol::Tcp,
&config.region,
);
let span = ctx.span.clone();
let startup = Box::pin(
handle_client(
config,
&mut ctx,
cancellation_handler,
socket,
ClientMode::Tcp,
endpoint_rate_limiter2,
conn_gauge,
)
.instrument(span.clone()),
);
let res = startup.await;
let res = handle_client(
config,
&mut ctx,
cancellation_handler,
socket,
ClientMode::Tcp,
endpoint_rate_limiter2,
conn_gauge,
)
.instrument(span.clone())
.await;
match res {
Err(e) => {

View File

@@ -98,7 +98,7 @@ pub(super) struct CopyBuffer {
amt: u64,
buf: Box<[u8]>,
}
const DEFAULT_BUF_SIZE: usize = 1024;
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
impl CopyBuffer {
pub(super) fn new() -> Self {

View File

@@ -12,7 +12,7 @@ use crate::auth::backend::{
};
use crate::config::{CertResolver, RetryConfig};
use crate::console::caches::NodeInfoCache;
use crate::console::messages::{ConsoleError, MetricsAuxInfo};
use crate::console::messages::MetricsAuxInfo;
use crate::console::provider::{CachedAllowedIps, CachedRoleSecret, ConsoleBackend};
use crate::console::{self, CachedNodeInfo, NodeInfo};
use crate::error::ErrorKind;
@@ -484,20 +484,18 @@ impl TestBackend for TestConnectMechanism {
match action {
ConnectAction::Wake => Ok(helper_create_cached_node_info(self.cache)),
ConnectAction::WakeFail => {
let err = console::errors::ApiError::Console(ConsoleError {
http_status_code: http::StatusCode::FORBIDDEN,
error: "TEST".into(),
status: None,
});
let err = console::errors::ApiError::Console {
status: http::StatusCode::FORBIDDEN,
text: "TEST".into(),
};
assert!(!err.could_retry());
Err(console::errors::WakeComputeError::ApiError(err))
}
ConnectAction::WakeRetry => {
let err = console::errors::ApiError::Console(ConsoleError {
http_status_code: http::StatusCode::BAD_REQUEST,
error: "TEST".into(),
status: None,
});
let err = console::errors::ApiError::Console {
status: http::StatusCode::BAD_REQUEST,
text: "TEST".into(),
};
assert!(err.could_retry());
Err(console::errors::WakeComputeError::ApiError(err))
}

View File

@@ -1,5 +1,4 @@
use crate::config::RetryConfig;
use crate::console::messages::ConsoleError;
use crate::console::{errors::WakeComputeError, provider::CachedNodeInfo};
use crate::context::RequestMonitoring;
use crate::metrics::{
@@ -89,76 +88,36 @@ fn report_error(e: &WakeComputeError, retry: bool) {
let kind = match e {
WakeComputeError::BadComputeAddress(_) => WakeupFailureKind::BadComputeAddress,
WakeComputeError::ApiError(ApiError::Transport(_)) => WakeupFailureKind::ApiTransportError,
WakeComputeError::ApiError(ApiError::Console(e)) => match e.get_reason() {
crate::console::messages::Reason::RoleProtected => {
WakeupFailureKind::ApiConsoleBadRequest
}
crate::console::messages::Reason::ResourceNotFound => {
WakeupFailureKind::ApiConsoleBadRequest
}
crate::console::messages::Reason::ProjectNotFound => {
WakeupFailureKind::ApiConsoleBadRequest
}
crate::console::messages::Reason::EndpointNotFound => {
WakeupFailureKind::ApiConsoleBadRequest
}
crate::console::messages::Reason::BranchNotFound => {
WakeupFailureKind::ApiConsoleBadRequest
}
crate::console::messages::Reason::RateLimitExceeded => {
WakeupFailureKind::ApiConsoleLocked
}
crate::console::messages::Reason::NonPrimaryBranchComputeTimeExceeded => {
WakeupFailureKind::QuotaExceeded
}
crate::console::messages::Reason::ActiveTimeQuotaExceeded => {
WakeupFailureKind::QuotaExceeded
}
crate::console::messages::Reason::ComputeTimeQuotaExceeded => {
WakeupFailureKind::QuotaExceeded
}
crate::console::messages::Reason::WrittenDataQuotaExceeded => {
WakeupFailureKind::QuotaExceeded
}
crate::console::messages::Reason::DataTransferQuotaExceeded => {
WakeupFailureKind::QuotaExceeded
}
crate::console::messages::Reason::LogicalSizeQuotaExceeded => {
WakeupFailureKind::QuotaExceeded
}
crate::console::messages::Reason::Unknown => match e {
ConsoleError {
http_status_code: StatusCode::LOCKED,
ref error,
..
} if error.contains("written data quota exceeded")
|| error.contains("the limit for current plan reached") =>
{
WakeupFailureKind::QuotaExceeded
}
ConsoleError {
http_status_code: StatusCode::UNPROCESSABLE_ENTITY,
ref error,
..
} if error.contains("compute time quota of non-primary branches is exceeded") => {
WakeupFailureKind::QuotaExceeded
}
ConsoleError {
http_status_code: StatusCode::LOCKED,
..
} => WakeupFailureKind::ApiConsoleLocked,
ConsoleError {
http_status_code: StatusCode::BAD_REQUEST,
..
} => WakeupFailureKind::ApiConsoleBadRequest,
ConsoleError {
http_status_code, ..
} if http_status_code.is_server_error() => {
WakeupFailureKind::ApiConsoleOtherServerError
}
ConsoleError { .. } => WakeupFailureKind::ApiConsoleOtherError,
},
},
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::LOCKED,
ref text,
}) if text.contains("written data quota exceeded")
|| text.contains("the limit for current plan reached") =>
{
WakeupFailureKind::QuotaExceeded
}
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::UNPROCESSABLE_ENTITY,
ref text,
}) if text.contains("compute time quota of non-primary branches is exceeded") => {
WakeupFailureKind::QuotaExceeded
}
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::LOCKED,
..
}) => WakeupFailureKind::ApiConsoleLocked,
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::BAD_REQUEST,
..
}) => WakeupFailureKind::ApiConsoleBadRequest,
WakeComputeError::ApiError(ApiError::Console { status, .. })
if status.is_server_error() =>
{
WakeupFailureKind::ApiConsoleOtherServerError
}
WakeComputeError::ApiError(ApiError::Console { .. }) => {
WakeupFailureKind::ApiConsoleOtherError
}
WakeComputeError::TooManyConnections => WakeupFailureKind::ApiConsoleLocked,
WakeComputeError::TooManyConnectionAttempts(_) => WakeupFailureKind::TimeoutError,
};

View File

@@ -1,3 +1,5 @@
use std::usize;
use super::{LimitAlgorithm, Outcome, Sample};
/// Loss-based congestion avoidance.

View File

@@ -32,6 +32,8 @@ pub struct ClientFirstMessage<'a> {
pub bare: &'a str,
/// Channel binding mode.
pub cbind_flag: ChannelBinding<&'a str>,
/// (Client username)[<https://github.com/postgres/postgres/blob/94226d4506e66d6e7cbf/src/backend/libpq/auth-scram.c#L13>].
pub username: &'a str,
/// Client nonce.
pub nonce: &'a str,
}
@@ -56,14 +58,6 @@ impl<'a> ClientFirstMessage<'a> {
// In theory, these might be preceded by "reserved-mext" (i.e. "m=")
let username = parts.next()?.strip_prefix("n=")?;
// https://github.com/postgres/postgres/blob/f83908798f78c4cafda217ca875602c88ea2ae28/src/backend/libpq/auth-scram.c#L13-L14
if !username.is_empty() {
tracing::warn!(username, "scram username provided, but is not expected")
// TODO(conrad):
// return None;
}
let nonce = parts.next()?.strip_prefix("r=")?;
// Validate but ignore auth extensions
@@ -72,6 +66,7 @@ impl<'a> ClientFirstMessage<'a> {
Some(Self {
bare,
cbind_flag,
username,
nonce,
})
}
@@ -193,18 +188,19 @@ mod tests {
// (Almost) real strings captured during debug sessions
let cases = [
(NotSupportedClient, "n,,n=,r=t8JwklwKecDLwSsA72rHmVju"),
(NotSupportedServer, "y,,n=,r=t8JwklwKecDLwSsA72rHmVju"),
(NotSupportedClient, "n,,n=pepe,r=t8JwklwKecDLwSsA72rHmVju"),
(NotSupportedServer, "y,,n=pepe,r=t8JwklwKecDLwSsA72rHmVju"),
(
Required("tls-server-end-point"),
"p=tls-server-end-point,,n=,r=t8JwklwKecDLwSsA72rHmVju",
"p=tls-server-end-point,,n=pepe,r=t8JwklwKecDLwSsA72rHmVju",
),
];
for (cb, input) in cases {
let msg = ClientFirstMessage::parse(input).unwrap();
assert_eq!(msg.bare, "n=,r=t8JwklwKecDLwSsA72rHmVju");
assert_eq!(msg.bare, "n=pepe,r=t8JwklwKecDLwSsA72rHmVju");
assert_eq!(msg.username, "pepe");
assert_eq!(msg.nonce, "t8JwklwKecDLwSsA72rHmVju");
assert_eq!(msg.cbind_flag, cb);
}
@@ -212,13 +208,14 @@ mod tests {
#[test]
fn parse_client_first_message_with_invalid_gs2_authz() {
assert!(ClientFirstMessage::parse("n,authzid,n=,r=nonce").is_none())
assert!(ClientFirstMessage::parse("n,authzid,n=user,r=nonce").is_none())
}
#[test]
fn parse_client_first_message_with_extra_params() {
let msg = ClientFirstMessage::parse("n,,n=,r=nonce,a=foo,b=bar,c=baz").unwrap();
assert_eq!(msg.bare, "n=,r=nonce,a=foo,b=bar,c=baz");
let msg = ClientFirstMessage::parse("n,,n=user,r=nonce,a=foo,b=bar,c=baz").unwrap();
assert_eq!(msg.bare, "n=user,r=nonce,a=foo,b=bar,c=baz");
assert_eq!(msg.username, "user");
assert_eq!(msg.nonce, "nonce");
assert_eq!(msg.cbind_flag, ChannelBinding::NotSupportedClient);
}
@@ -226,9 +223,9 @@ mod tests {
#[test]
fn parse_client_first_message_with_extra_params_invalid() {
// must be of the form `<ascii letter>=<...>`
assert!(ClientFirstMessage::parse("n,,n=,r=nonce,abc=foo").is_none());
assert!(ClientFirstMessage::parse("n,,n=,r=nonce,1=foo").is_none());
assert!(ClientFirstMessage::parse("n,,n=,r=nonce,a").is_none());
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,abc=foo").is_none());
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,1=foo").is_none());
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,a").is_none());
}
#[test]

View File

@@ -27,14 +27,14 @@ use rand::SeedableRng;
pub use reqwest_middleware::{ClientWithMiddleware, Error};
pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use tokio::time::timeout;
use tokio_rustls::{server::TlsStream, TlsAcceptor};
use tokio_rustls::TlsAcceptor;
use tokio_util::task::TaskTracker;
use crate::cancellation::CancellationHandlerMain;
use crate::config::ProxyConfig;
use crate::context::RequestMonitoring;
use crate::metrics::Metrics;
use crate::protocol2::{read_proxy_protocol, ChainRW};
use crate::protocol2::read_proxy_protocol;
use crate::proxy::run_until_cancelled;
use crate::rate_limiter::EndpointRateLimiter;
use crate::serverless::backend::PoolingBackend;
@@ -102,6 +102,8 @@ pub async fn task_main(
let connections = tokio_util::task::task_tracker::TaskTracker::new();
connections.close(); // allows `connections.wait to complete`
let server = Builder::new(TokioExecutor::new());
while let Some(res) = run_until_cancelled(ws_listener.accept(), &cancellation_token).await {
let (conn, peer_addr) = res.context("could not accept TCP stream")?;
if let Err(e) = conn.set_nodelay(true) {
@@ -125,50 +127,24 @@ pub async fn task_main(
}
let conn_token = cancellation_token.child_token();
let tls_acceptor = tls_acceptor.clone();
let backend = backend.clone();
let connections2 = connections.clone();
let cancellation_handler = cancellation_handler.clone();
let endpoint_rate_limiter = endpoint_rate_limiter.clone();
connections.spawn(
async move {
let conn_token2 = conn_token.clone();
let _cancel_guard = config.http_config.cancel_set.insert(conn_id, conn_token2);
let conn = connection_handler(
config,
backend.clone(),
connections.clone(),
cancellation_handler.clone(),
endpoint_rate_limiter.clone(),
conn_token.clone(),
server.clone(),
tls_acceptor.clone(),
conn,
peer_addr,
)
.instrument(http_conn_span);
let session_id = uuid::Uuid::new_v4();
let _gauge = Metrics::get()
.proxy
.client_connections
.guard(crate::metrics::Protocol::Http);
let startup_result = Box::pin(connection_startup(
config,
tls_acceptor,
session_id,
conn,
peer_addr,
))
.await;
let Some((conn, peer_addr)) = startup_result else {
return;
};
Box::pin(connection_handler(
config,
backend,
connections2,
cancellation_handler,
endpoint_rate_limiter,
conn_token,
conn,
peer_addr,
session_id,
))
.await;
}
.instrument(http_conn_span),
);
connections.spawn(async move {
let _cancel_guard = config.http_config.cancel_set.insert(conn_id, conn_token);
conn.await
});
}
connections.wait().await;
@@ -176,22 +152,40 @@ pub async fn task_main(
Ok(())
}
/// Handles the TCP startup lifecycle.
/// Handles the TCP lifecycle.
///
/// 1. Parses PROXY protocol V2
/// 2. Handles TLS handshake
async fn connection_startup(
config: &ProxyConfig,
/// 3. Handles HTTP connection
/// 1. With graceful shutdowns
/// 2. With graceful request cancellation with connection failure
/// 3. With websocket upgrade support.
#[allow(clippy::too_many_arguments)]
async fn connection_handler(
config: &'static ProxyConfig,
backend: Arc<PoolingBackend>,
connections: TaskTracker,
cancellation_handler: Arc<CancellationHandlerMain>,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
cancellation_token: CancellationToken,
server: Builder<TokioExecutor>,
tls_acceptor: TlsAcceptor,
session_id: uuid::Uuid,
conn: TcpStream,
peer_addr: SocketAddr,
) -> Option<(TlsStream<ChainRW<TcpStream>>, IpAddr)> {
) {
let session_id = uuid::Uuid::new_v4();
let _gauge = Metrics::get()
.proxy
.client_connections
.guard(crate::metrics::Protocol::Http);
// handle PROXY protocol
let (conn, peer) = match read_proxy_protocol(conn).await {
Ok(c) => c,
Err(e) => {
tracing::error!(?session_id, %peer_addr, "failed to accept TCP connection: invalid PROXY protocol V2 header: {e:#}");
return None;
return;
}
};
@@ -214,7 +208,7 @@ async fn connection_startup(
Metrics::get().proxy.tls_handshake_failures.inc();
}
warn!(?session_id, %peer_addr, "failed to accept TLS connection: {e:?}");
return None;
return;
}
// The handshake timed out
Err(e) => {
@@ -222,36 +216,16 @@ async fn connection_startup(
Metrics::get().proxy.tls_handshake_failures.inc();
}
warn!(?session_id, %peer_addr, "failed to accept TLS connection: {e:?}");
return None;
return;
}
};
Some((conn, peer_addr))
}
/// Handles HTTP connection
/// 1. With graceful shutdowns
/// 2. With graceful request cancellation with connection failure
/// 3. With websocket upgrade support.
#[allow(clippy::too_many_arguments)]
async fn connection_handler(
config: &'static ProxyConfig,
backend: Arc<PoolingBackend>,
connections: TaskTracker,
cancellation_handler: Arc<CancellationHandlerMain>,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
cancellation_token: CancellationToken,
conn: TlsStream<ChainRW<TcpStream>>,
peer_addr: IpAddr,
session_id: uuid::Uuid,
) {
let session_id = AtomicTake::new(session_id);
// Cancel all current inflight HTTP requests if the HTTP connection is closed.
let http_cancellation_token = CancellationToken::new();
let _cancel_connection = http_cancellation_token.clone().drop_guard();
let server = Builder::new(TokioExecutor::new());
let conn = server.serve_connection_with_upgrades(
hyper_util::rt::TokioIo::new(conn),
hyper1::service::service_fn(move |req: hyper1::Request<Incoming>| {

View File

@@ -104,7 +104,7 @@ impl PoolingBackend {
) -> Result<Client<tokio_postgres::Client>, HttpConnError> {
let maybe_client = if !force_new {
info!("pool: looking for an existing connection");
self.pool.get(ctx, &conn_info)?
self.pool.get(ctx, &conn_info).await?
} else {
info!("pool: pool is disabled");
None

View File

@@ -375,7 +375,7 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
}
}
pub fn get(
pub async fn get(
self: &Arc<Self>,
ctx: &mut RequestMonitoring,
conn_info: &ConnInfo,

View File

@@ -533,31 +533,27 @@ async fn handle_inner(
return Err(SqlOverHttpError::RequestTooLarge);
}
let fetch_and_process_request = Box::pin(
async {
let body = request.into_body().collect().await?.to_bytes();
info!(length = body.len(), "request payload read");
let payload: Payload = serde_json::from_slice(&body)?;
Ok::<Payload, ReadPayloadError>(payload) // Adjust error type accordingly
}
.map_err(SqlOverHttpError::from),
);
let fetch_and_process_request = async {
let body = request.into_body().collect().await?.to_bytes();
info!(length = body.len(), "request payload read");
let payload: Payload = serde_json::from_slice(&body)?;
Ok::<Payload, ReadPayloadError>(payload) // Adjust error type accordingly
}
.map_err(SqlOverHttpError::from);
let authenticate_and_connect = Box::pin(
async {
let keys = backend
.authenticate(ctx, &config.authentication_config, &conn_info)
.await?;
let client = backend
.connect_to_compute(ctx, conn_info, keys, !allow_pool)
.await?;
// not strictly necessary to mark success here,
// but it's just insurance for if we forget it somewhere else
ctx.latency_timer.success();
Ok::<_, HttpConnError>(client)
}
.map_err(SqlOverHttpError::from),
);
let authenticate_and_connect = async {
let keys = backend
.authenticate(ctx, &config.authentication_config, &conn_info)
.await?;
let client = backend
.connect_to_compute(ctx, conn_info, keys, !allow_pool)
.await?;
// not strictly necessary to mark success here,
// but it's just insurance for if we forget it somewhere else
ctx.latency_timer.success();
Ok::<_, HttpConnError>(client)
}
.map_err(SqlOverHttpError::from);
let (payload, mut client) = match run_until_cancelled(
// Run both operations in parallel

View File

@@ -141,7 +141,7 @@ pub async fn serve_websocket(
.client_connections
.guard(crate::metrics::Protocol::Ws);
let res = Box::pin(handle_client(
let res = handle_client(
config,
&mut ctx,
cancellation_handler,
@@ -149,7 +149,7 @@ pub async fn serve_websocket(
ClientMode::Websockets { hostname },
endpoint_rate_limiter,
conn_gauge,
))
)
.await;
match res {

View File

@@ -1,5 +1,5 @@
[toolchain]
channel = "1.79.0"
channel = "1.78.0"
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html

View File

@@ -46,7 +46,6 @@ tokio = { workspace = true, features = ["fs"] }
tokio-util = { workspace = true }
tokio-io-timeout.workspace = true
tokio-postgres.workspace = true
tokio-tar.workspace = true
toml_edit.workspace = true
tracing.workspace = true
url.workspace = true

View File

@@ -13,9 +13,7 @@ use tokio::runtime::Handle;
use tokio::signal::unix::{signal, SignalKind};
use tokio::task::JoinError;
use toml_edit::Document;
use utils::logging::SecretString;
use std::env::{var, VarError};
use std::fs::{self, File};
use std::io::{ErrorKind, Write};
use std::str::FromStr;
@@ -289,22 +287,6 @@ async fn main() -> anyhow::Result<()> {
}
};
// Load JWT auth token to connect to other safekeepers for pull_timeline.
let sk_auth_token = match var("SAFEKEEPER_AUTH_TOKEN") {
Ok(v) => {
info!("loaded JWT token for authentication with safekeepers");
Some(SecretString::from(v))
}
Err(VarError::NotPresent) => {
info!("no JWT token for authentication with safekeepers detected");
None
}
Err(_) => {
warn!("JWT token for authentication with safekeepers is not unicode");
None
}
};
let conf = SafeKeeperConf {
workdir,
my_id: id,
@@ -325,7 +307,6 @@ async fn main() -> anyhow::Result<()> {
pg_auth,
pg_tenant_only_auth,
http_auth,
sk_auth_token,
current_thread_runtime: args.current_thread_runtime,
walsenders_keep_horizon: args.walsenders_keep_horizon,
partial_backup_enabled: args.partial_backup_enabled,

View File

@@ -23,7 +23,7 @@ pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 8;
// contains persistent metadata for safekeeper
pub const CONTROL_FILE_NAME: &str = "safekeeper.control";
const CONTROL_FILE_NAME: &str = "safekeeper.control";
// needed to atomically update the state using `rename`
const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
pub const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();

View File

@@ -1,139 +0,0 @@
//! Safekeeper http client.
//!
//! Partially copied from pageserver client; some parts might be better to be
//! united.
//!
//! It would be also good to move it out to separate crate, but this needs
//! duplication of internal-but-reported structs like WalSenderState, ServerInfo
//! etc.
use reqwest::{IntoUrl, Method, StatusCode};
use utils::{
http::error::HttpErrorBody,
id::{TenantId, TimelineId},
logging::SecretString,
};
use super::routes::TimelineStatus;
#[derive(Debug, Clone)]
pub struct Client {
mgmt_api_endpoint: String,
authorization_header: Option<SecretString>,
client: reqwest::Client,
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// Failed to receive body (reqwest error).
#[error("receive body: {0}")]
ReceiveBody(reqwest::Error),
/// Status is not ok, but failed to parse body as `HttpErrorBody`.
#[error("receive error body: {0}")]
ReceiveErrorBody(String),
/// Status is not ok; parsed error in body as `HttpErrorBody`.
#[error("safekeeper API: {1}")]
ApiError(StatusCode, String),
}
pub type Result<T> = std::result::Result<T, Error>;
pub trait ResponseErrorMessageExt: Sized {
fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
}
/// If status is not ok, try to extract error message from the body.
impl ResponseErrorMessageExt for reqwest::Response {
async fn error_from_body(self) -> Result<Self> {
let status = self.status();
if !(status.is_client_error() || status.is_server_error()) {
return Ok(self);
}
let url = self.url().to_owned();
Err(match self.json::<HttpErrorBody>().await {
Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
Err(_) => {
Error::ReceiveErrorBody(format!("http error ({}) at {}.", status.as_u16(), url))
}
})
}
}
impl Client {
pub fn new(mgmt_api_endpoint: String, jwt: Option<SecretString>) -> Self {
Self::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt)
}
pub fn from_client(
client: reqwest::Client,
mgmt_api_endpoint: String,
jwt: Option<SecretString>,
) -> Self {
Self {
mgmt_api_endpoint,
authorization_header: jwt
.map(|jwt| SecretString::from(format!("Bearer {}", jwt.get_contents()))),
client,
}
}
pub async fn timeline_status(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<TimelineStatus> {
let uri = format!(
"{}/v1/tenant/{}/timeline/{}",
self.mgmt_api_endpoint, tenant_id, timeline_id
);
let resp = self.get(&uri).await?;
resp.json().await.map_err(Error::ReceiveBody)
}
pub async fn snapshot(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<reqwest::Response> {
let uri = format!(
"{}/v1/tenant/{}/timeline/{}/snapshot",
self.mgmt_api_endpoint, tenant_id, timeline_id
);
self.get(&uri).await
}
async fn get<U: IntoUrl>(&self, uri: U) -> Result<reqwest::Response> {
self.request(Method::GET, uri, ()).await
}
/// Send the request and check that the status code is good.
async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: B,
) -> Result<reqwest::Response> {
let res = self.request_noerror(method, uri, body).await?;
let response = res.error_from_body().await?;
Ok(response)
}
/// Just send the request.
async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: B,
) -> Result<reqwest::Response> {
let req = self.client.request(method, uri);
let req = if let Some(value) = &self.authorization_header {
req.header(reqwest::header::AUTHORIZATION, value.get_contents())
} else {
req
};
req.json(&body).send().await.map_err(Error::ReceiveBody)
}
}

View File

@@ -1,4 +1,3 @@
pub mod client;
pub mod routes;
pub use routes::make_router;

View File

@@ -1,25 +1,38 @@
use hyper::{Body, Request, Response, StatusCode, Uri};
use once_cell::sync::Lazy;
use postgres_ffi::WAL_SEGMENT_SIZE;
use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::io::Write as _;
use std::str::FromStr;
use std::sync::Arc;
use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use tokio::sync::mpsc;
use tokio::task;
use tokio_stream::wrappers::ReceiverStream;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio_util::sync::CancellationToken;
use tracing::{info_span, Instrument};
use utils::failpoint_support::failpoints_handler;
use utils::http::endpoint::{prometheus_metrics_handler, request_span, ChannelWriter};
use utils::http::request::parse_query_param;
use postgres_ffi::WAL_SEGMENT_SIZE;
use safekeeper_api::models::TimelineCreateRequest;
use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest};
use std::io::Write as _;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{info_span, Instrument};
use utils::http::endpoint::{prometheus_metrics_handler, request_span, ChannelWriter};
use crate::debug_dump::TimelineDigestRequest;
use crate::receive_wal::WalReceiverState;
use crate::safekeeper::Term;
use crate::safekeeper::{ServerInfo, TermLsn};
use crate::send_wal::WalSenderState;
use crate::timeline::PeerInfo;
use crate::{copy_timeline, debug_dump, patch_control_file, pull_timeline};
use crate::timelines_global_map::TimelineDeleteForceResult;
use crate::GlobalTimelines;
use crate::SafeKeeperConf;
use utils::{
auth::SwappableJwtAuth,
http::{
@@ -33,16 +46,7 @@ use utils::{
lsn::Lsn,
};
use crate::debug_dump::TimelineDigestRequest;
use crate::receive_wal::WalReceiverState;
use crate::safekeeper::Term;
use crate::safekeeper::{ServerInfo, TermLsn};
use crate::send_wal::WalSenderState;
use crate::timeline::PeerInfo;
use crate::timelines_global_map::TimelineDeleteForceResult;
use crate::GlobalTimelines;
use crate::SafeKeeperConf;
use crate::{copy_timeline, debug_dump, patch_control_file, pull_timeline};
use super::models::TimelineCreateRequest;
#[derive(Debug, Serialize)]
struct SafekeeperStatus {
@@ -195,50 +199,13 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
check_permission(&request, None)?;
let data: pull_timeline::Request = json_request(&mut request).await?;
let conf = get_conf(&request);
let resp = pull_timeline::handle_request(data, conf.sk_auth_token.clone())
let resp = pull_timeline::handle_request(data)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, resp)
}
/// Stream tar archive with all timeline data.
async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
check_permission(&request, Some(ttid.tenant_id))?;
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
// Note: with evicted timelines it should work better then de-evict them and
// stream; probably start_snapshot would copy partial s3 file to dest path
// and stream control file, or return FullAccessTimeline if timeline is not
// evicted.
let tli = tli
.full_access_guard()
.await
.map_err(ApiError::InternalServerError)?;
// To stream the body use wrap_stream which wants Stream of Result<Bytes>,
// so create the chan and write to it in another task.
let (tx, rx) = mpsc::channel(1);
task::spawn(pull_timeline::stream_snapshot(tli, tx));
let rx_stream = ReceiverStream::new(rx);
let body = Body::wrap_stream(rx_stream);
let response = Response::builder()
.status(200)
.header(hyper::header::CONTENT_TYPE, "application/octet-stream")
.body(body)
.unwrap();
Ok(response)
}
async fn timeline_copy_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
@@ -293,6 +260,41 @@ async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body
json_response(StatusCode::OK, response)
}
/// Download a file from the timeline directory.
// TODO: figure out a better way to copy files between safekeepers
async fn timeline_files_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
check_permission(&request, Some(ttid.tenant_id))?;
let filename: String = parse_request_param(&request, "filename")?;
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
let tli = tli
.full_access_guard()
.await
.map_err(ApiError::InternalServerError)?;
let filepath = tli.get_timeline_dir().join(filename);
let mut file = File::open(&filepath)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
let mut content = Vec::new();
// TODO: don't store files in memory
file.read_to_end(&mut content)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/octet-stream")
.body(Body::from(content))
.map_err(|e| ApiError::InternalServerError(e.into()))
}
/// Force persist control file.
async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
@@ -564,13 +566,13 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
.delete("/v1/tenant/:tenant_id", |r| {
request_span(r, tenant_delete_handler)
})
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot",
|r| request_span(r, timeline_snapshot_handler),
)
.post("/v1/pull_timeline", |r| {
request_span(r, timeline_pull_handler)
})
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename",
|r| request_span(r, timeline_files_handler),
)
.post(
"/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy",
|r| request_span(r, timeline_copy_handler),

View File

@@ -7,7 +7,7 @@ use tokio::runtime::Runtime;
use std::time::Duration;
use storage_broker::Uri;
use utils::{auth::SwappableJwtAuth, id::NodeId, logging::SecretString};
use utils::{auth::SwappableJwtAuth, id::NodeId};
mod auth;
pub mod broker;
@@ -78,8 +78,6 @@ pub struct SafeKeeperConf {
pub pg_auth: Option<Arc<JwtAuth>>,
pub pg_tenant_only_auth: Option<Arc<JwtAuth>>,
pub http_auth: Option<Arc<SwappableJwtAuth>>,
/// JWT token to connect to other safekeepers with.
pub sk_auth_token: Option<SecretString>,
pub current_thread_runtime: bool,
pub walsenders_keep_horizon: bool,
pub partial_backup_enabled: bool,
@@ -116,7 +114,6 @@ impl SafeKeeperConf {
pg_auth: None,
pg_tenant_only_auth: None,
http_auth: None,
sk_auth_token: None,
heartbeat_timeout: Duration::new(5, 0),
max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
current_thread_runtime: false,

View File

@@ -1,244 +1,28 @@
use anyhow::{anyhow, bail, Context, Result};
use bytes::Bytes;
use std::sync::Arc;
use camino::Utf8PathBuf;
use camino_tempfile::Utf8TempDir;
use chrono::{DateTime, Utc};
use futures::{SinkExt, StreamExt, TryStreamExt};
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
use serde::{Deserialize, Serialize};
use std::{
cmp::min,
io::{self, ErrorKind},
sync::Arc,
};
use tokio::{
fs::{File, OpenOptions},
io::AsyncWrite,
sync::mpsc,
task,
};
use tokio_tar::{Archive, Builder};
use tokio_util::{
io::{CopyToBytes, SinkWriter},
sync::PollSender,
};
use tracing::{error, info, instrument};
use crate::{
control_file::{self, CONTROL_FILE_NAME},
debug_dump,
http::{
client::{self, Client},
routes::TimelineStatus,
},
safekeeper::Term,
timeline::{get_tenant_dir, get_timeline_dir, FullAccessTimeline, Timeline, TimelineError},
wal_storage::{self, open_wal_file, Storage},
GlobalTimelines, SafeKeeperConf,
};
use anyhow::{bail, Context, Result};
use tokio::io::AsyncWriteExt;
use tracing::info;
use utils::{
crashsafe::{durable_rename, fsync_async_opt},
id::{TenantId, TenantTimelineId, TimelineId},
logging::SecretString,
lsn::Lsn,
pausable_failpoint,
};
/// Stream tar archive of timeline to tx.
#[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))]
pub async fn stream_snapshot(tli: FullAccessTimeline, tx: mpsc::Sender<Result<Bytes>>) {
if let Err(e) = stream_snapshot_guts(tli, tx.clone()).await {
// Error type/contents don't matter as they won't can't reach the client
// (hyper likely doesn't do anything with it), but http stream will be
// prematurely terminated. It would be nice to try to send the error in
// trailers though.
tx.send(Err(anyhow!("snapshot failed"))).await.ok();
error!("snapshot failed: {:#}", e);
}
}
use crate::{
control_file, debug_dump,
http::routes::TimelineStatus,
timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError},
wal_storage::{self, Storage},
GlobalTimelines, SafeKeeperConf,
};
/// State needed while streaming the snapshot.
pub struct SnapshotContext {
pub from_segno: XLogSegNo, // including
pub upto_segno: XLogSegNo, // including
pub term: Term,
pub last_log_term: Term,
pub flush_lsn: Lsn,
pub wal_seg_size: usize,
// used to remove WAL hold off in Drop.
pub tli: FullAccessTimeline,
}
impl Drop for SnapshotContext {
fn drop(&mut self) {
let tli = self.tli.clone();
task::spawn(async move {
let mut shared_state = tli.write_shared_state().await;
shared_state.wal_removal_on_hold = false;
});
}
}
pub async fn stream_snapshot_guts(
tli: FullAccessTimeline,
tx: mpsc::Sender<Result<Bytes>>,
) -> Result<()> {
// tokio-tar wants Write implementor, but we have mpsc tx <Result<Bytes>>;
// use SinkWriter as a Write impl. That is,
// - create Sink from the tx. It returns PollSendError if chan is closed.
let sink = PollSender::new(tx);
// - SinkWriter needs sink error to be io one, map it.
let sink_io_err = sink.sink_map_err(|_| io::Error::from(ErrorKind::BrokenPipe));
// - SinkWriter wants sink type to be just Bytes, not Result<Bytes>, so map
// it with with(). Note that with() accepts async function which we don't
// need and allows the map to fail, which we don't need either, but hence
// two Oks.
let oksink = sink_io_err.with(|b: Bytes| async { io::Result::Ok(Result::Ok(b)) });
// - SinkWriter (not surprisingly) wants sink of &[u8], not bytes, so wrap
// into CopyToBytes. This is a data copy.
let copy_to_bytes = CopyToBytes::new(oksink);
let mut writer = SinkWriter::new(copy_to_bytes);
let pinned_writer = std::pin::pin!(writer);
// Note that tokio_tar append_* funcs use tokio::io::copy with 8KB buffer
// which is also likely suboptimal.
let mut ar = Builder::new_non_terminated(pinned_writer);
let bctx = tli.start_snapshot(&mut ar).await?;
pausable_failpoint!("sk-snapshot-after-list-pausable");
let tli_dir = tli.get_timeline_dir();
info!(
"sending {} segments [{:#X}-{:#X}], term={}, last_log_term={}, flush_lsn={}",
bctx.upto_segno - bctx.from_segno + 1,
bctx.from_segno,
bctx.upto_segno,
bctx.term,
bctx.last_log_term,
bctx.flush_lsn,
);
for segno in bctx.from_segno..=bctx.upto_segno {
let (mut sf, is_partial) = open_wal_file(&tli_dir, segno, bctx.wal_seg_size).await?;
let mut wal_file_name = XLogFileName(PG_TLI, segno, bctx.wal_seg_size);
if is_partial {
wal_file_name.push_str(".partial");
}
ar.append_file(&wal_file_name, &mut sf).await?;
}
// Do the term check before ar.finish to make archive corrupted in case of
// term change. Client shouldn't ignore abrupt stream end, but to be sure.
tli.finish_snapshot(&bctx).await?;
ar.finish().await?;
Ok(())
}
impl FullAccessTimeline {
/// Start streaming tar archive with timeline:
/// 1) stream control file under lock;
/// 2) hold off WAL removal;
/// 3) collect SnapshotContext to understand which WAL segments should be
/// streamed.
///
/// Snapshot streams data up to flush_lsn. To make this safe, we must check
/// that term doesn't change during the procedure, or we risk sending mix of
/// WAL from different histories. Term is remembered in the SnapshotContext
/// and checked in finish_snapshot. Note that in the last segment some WAL
/// higher than flush_lsn set here might be streamed; that's fine as long as
/// terms doesn't change.
///
/// Alternatively we could send only up to commit_lsn to get some valid
/// state which later will be recovered by compute, in this case term check
/// is not needed, but we likely don't want that as there might be no
/// compute which could perform the recovery.
///
/// When returned SnapshotContext is dropped WAL hold is removed.
async fn start_snapshot<W: AsyncWrite + Unpin + Send>(
&self,
ar: &mut tokio_tar::Builder<W>,
) -> Result<SnapshotContext> {
let mut shared_state = self.write_shared_state().await;
let cf_path = self.get_timeline_dir().join(CONTROL_FILE_NAME);
let mut cf = File::open(cf_path).await?;
ar.append_file(CONTROL_FILE_NAME, &mut cf).await?;
// We need to stream since the oldest segment someone (s3 or pageserver)
// still needs. This duplicates calc_horizon_lsn logic.
//
// We know that WAL wasn't removed up to this point because it cannot be
// removed further than `backup_lsn`. Since we're holding shared_state
// lock and setting `wal_removal_on_hold` later, it guarantees that WAL
// won't be removed until we're done.
let from_lsn = min(
shared_state.sk.state.remote_consistent_lsn,
shared_state.sk.state.backup_lsn,
);
if from_lsn == Lsn::INVALID {
// this is possible if snapshot is called before handling first
// elected message
bail!("snapshot is called on uninitialized timeline");
}
let from_segno = from_lsn.segment_number(shared_state.get_wal_seg_size());
let term = shared_state.sk.get_term();
let last_log_term = shared_state.sk.get_last_log_term();
let flush_lsn = shared_state.sk.flush_lsn();
let upto_segno = flush_lsn.segment_number(shared_state.get_wal_seg_size());
// have some limit on max number of segments as a sanity check
const MAX_ALLOWED_SEGS: u64 = 1000;
let num_segs = upto_segno - from_segno + 1;
if num_segs > MAX_ALLOWED_SEGS {
bail!(
"snapshot is called on timeline with {} segments, but the limit is {}",
num_segs,
MAX_ALLOWED_SEGS
);
}
// Prevent WAL removal while we're streaming data.
//
// Since this a flag, not a counter just bail out if already set; we
// shouldn't need concurrent snapshotting.
if shared_state.wal_removal_on_hold {
bail!("wal_removal_on_hold is already true");
}
shared_state.wal_removal_on_hold = true;
let bctx = SnapshotContext {
from_segno,
upto_segno,
term,
last_log_term,
flush_lsn,
wal_seg_size: shared_state.get_wal_seg_size(),
tli: self.clone(),
};
Ok(bctx)
}
/// Finish snapshotting: check that term(s) hasn't changed.
///
/// Note that WAL gc hold off is removed in Drop of SnapshotContext to not
/// forget this if snapshotting fails mid the way.
pub async fn finish_snapshot(&self, bctx: &SnapshotContext) -> Result<()> {
let shared_state = self.read_shared_state().await;
let term = shared_state.sk.get_term();
let last_log_term = shared_state.sk.get_last_log_term();
// There are some cases to relax this check (e.g. last_log_term might
// change, but as long as older history is strictly part of new that's
// fine), but there is no need to do it.
if bctx.term != term || bctx.last_log_term != last_log_term {
bail!("term(s) changed during snapshot: were term={}, last_log_term={}, now term={}, last_log_term={}",
bctx.term, bctx.last_log_term, term, last_log_term);
}
Ok(())
}
}
/// pull_timeline request body.
/// Info about timeline on safekeeper ready for reporting.
#[derive(Debug, Serialize, Deserialize)]
pub struct Request {
pub tenant_id: TenantId,
@@ -264,10 +48,7 @@ pub struct DebugDumpResponse {
}
/// Find the most advanced safekeeper and pull timeline from it.
pub async fn handle_request(
request: Request,
sk_auth_token: Option<SecretString>,
) -> Result<Response> {
pub async fn handle_request(request: Request) -> Result<Response> {
let existing_tli = GlobalTimelines::get(TenantTimelineId::new(
request.tenant_id,
request.timeline_id,
@@ -276,26 +57,28 @@ pub async fn handle_request(
bail!("Timeline {} already exists", request.timeline_id);
}
let client = reqwest::Client::new();
let http_hosts = request.http_hosts.clone();
// Figure out statuses of potential donors.
let responses: Vec<Result<TimelineStatus, client::Error>> =
futures::future::join_all(http_hosts.iter().map(|url| async {
let cclient = Client::new(url.clone(), sk_auth_token.clone());
let info = cclient
.timeline_status(request.tenant_id, request.timeline_id)
.await?;
Ok(info)
}))
.await;
// Send request to /v1/tenant/:tenant_id/timeline/:timeline_id
let responses = futures::future::join_all(http_hosts.iter().map(|url| {
let url = format!(
"{}/v1/tenant/{}/timeline/{}",
url, request.tenant_id, request.timeline_id
);
client.get(url).send()
}))
.await;
let mut statuses = Vec::new();
for (i, response) in responses.into_iter().enumerate() {
let status = response.context(format!("fetching status from {}", http_hosts[i]))?;
let response = response.context(format!("Failed to get status from {}", http_hosts[i]))?;
let status: crate::http::routes::TimelineStatus = response.json().await?;
statuses.push((status, i));
}
// Find the most advanced safekeeper
// TODO: current logic may be wrong, fix it later
let (status, i) = statuses
.into_iter()
.max_by_key(|(status, _)| {
@@ -311,14 +94,10 @@ pub async fn handle_request(
assert!(status.tenant_id == request.tenant_id);
assert!(status.timeline_id == request.timeline_id);
pull_timeline(status, safekeeper_host, sk_auth_token).await
pull_timeline(status, safekeeper_host).await
}
async fn pull_timeline(
status: TimelineStatus,
host: String,
sk_auth_token: Option<SecretString>,
) -> Result<Response> {
async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response> {
let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
info!(
"pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
@@ -332,53 +111,95 @@ async fn pull_timeline(
let conf = &GlobalTimelines::get_global_config();
let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
let client = reqwest::Client::new();
// TODO: don't use debug dump, it should be used only in tests.
// This is a proof of concept, we should figure out a way
// to use scp without implementing it manually.
let client = Client::new(host.clone(), sk_auth_token.clone());
// Request stream with basebackup archive.
let bb_resp = client
.snapshot(status.tenant_id, status.timeline_id)
// Implementing our own scp over HTTP.
// At first, we need to fetch list of files from safekeeper.
let dump: DebugDumpResponse = client
.get(format!(
"{}/v1/debug_dump?dump_all=true&tenant_id={}&timeline_id={}",
host, status.tenant_id, status.timeline_id
))
.send()
.await?
.json()
.await?;
// Make Stream of Bytes from it...
let bb_stream = bb_resp.bytes_stream().map_err(std::io::Error::other);
// and turn it into StreamReader implementing AsyncRead.
let bb_reader = tokio_util::io::StreamReader::new(bb_stream);
if dump.timelines.len() != 1 {
bail!(
"expected to fetch single timeline, got {} timelines",
dump.timelines.len()
);
}
// Extract it on the fly to the disk. We don't use simple unpack() to fsync
// files.
let mut entries = Archive::new(bb_reader).entries()?;
while let Some(base_tar_entry) = entries.next().await {
let mut entry = base_tar_entry?;
let header = entry.header();
let file_path = header.path()?.into_owned();
match header.entry_type() {
tokio_tar::EntryType::Regular => {
let utf8_file_path =
Utf8PathBuf::from_path_buf(file_path).expect("non-Unicode path");
let dst_path = tli_dir_path.join(utf8_file_path);
let mut f = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&dst_path)
.await?;
tokio::io::copy(&mut entry, &mut f).await?;
// fsync the file
f.sync_all().await?;
}
_ => {
bail!(
"entry {} in backup tar archive is of unexpected type: {:?}",
file_path.display(),
header.entry_type()
);
}
let timeline = dump.timelines.into_iter().next().unwrap();
let disk_content = timeline.disk_content.ok_or(anyhow::anyhow!(
"timeline {} doesn't have disk content",
ttid
))?;
let mut filenames = disk_content
.files
.iter()
.map(|file| file.name.clone())
.collect::<Vec<_>>();
// Sort filenames to make sure we pull files in correct order
// After sorting, we should have:
// - 000000010000000000000001
// - ...
// - 000000010000000000000002.partial
// - safekeeper.control
filenames.sort();
// safekeeper.control should be the first file, so we need to move it to the beginning
let control_file_index = filenames
.iter()
.position(|name| name == "safekeeper.control")
.ok_or(anyhow::anyhow!("safekeeper.control not found"))?;
filenames.remove(control_file_index);
filenames.insert(0, "safekeeper.control".to_string());
pausable_failpoint!("sk-pull-timeline-after-list-pausable");
info!(
"downloading {} files from safekeeper {}",
filenames.len(),
host
);
let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
// Note: some time happens between fetching list of files and fetching files themselves.
// It's possible that some files will be removed from safekeeper and we will fail to fetch them.
// This function will fail in this case, should be retried by the caller.
for filename in filenames {
let file_path = tli_dir_path.join(&filename);
// /v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename
let http_url = format!(
"{}/v1/tenant/{}/timeline/{}/file/{}",
host, status.tenant_id, status.timeline_id, filename
);
let mut file = tokio::fs::File::create(&file_path).await?;
let mut response = client.get(&http_url).send().await?;
if response.status() != reqwest::StatusCode::OK {
bail!(
"pulling file {} failed: status is {}",
filename,
response.status()
);
}
while let Some(chunk) = response.chunk().await? {
file.write_all(&chunk).await?;
file.flush().await?;
}
}
// fsync temp timeline directory to remember its contents.
fsync_async_opt(&tli_dir_path, !conf.no_sync).await?;
// TODO: fsync?
// Let's create timeline from temp directory and verify that it's correct
let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?;
@@ -469,9 +290,7 @@ pub async fn load_temp_timeline(
ttid, tmp_path, timeline_path
);
tokio::fs::create_dir_all(get_tenant_dir(conf, &ttid.tenant_id)).await?;
// fsync tenant dir creation
fsync_async_opt(&conf.workdir, !conf.no_sync).await?;
durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
tokio::fs::rename(tmp_path, &timeline_path).await?;
let tli = GlobalTimelines::load_timeline(&guard, ttid)
.await

View File

@@ -780,9 +780,6 @@ where
// Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn);
// similar for remote_consistent_lsn
state.remote_consistent_lsn =
max(state.remote_consistent_lsn, state.timeline_start_lsn);
state.acceptor_state.term_history = msg.term_history.clone();
self.state.finish_change(&state).await?;

View File

@@ -4,7 +4,7 @@
use anyhow::{anyhow, bail, Result};
use camino::Utf8PathBuf;
use serde::{Deserialize, Serialize};
use tokio::fs::{self};
use tokio::fs;
use tokio_util::sync::CancellationToken;
use utils::id::TenantId;
@@ -168,9 +168,6 @@ pub struct SharedState {
pub(crate) sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
/// In memory list containing state of peers sent in latest messages from them.
pub(crate) peers_info: PeersInfo,
// True value hinders old WAL removal; this is used by snapshotting. We
// could make it a counter, but there is no need to.
pub(crate) wal_removal_on_hold: bool,
}
impl SharedState {
@@ -208,7 +205,6 @@ impl SharedState {
Ok(Self {
sk,
peers_info: PeersInfo(vec![]),
wal_removal_on_hold: false,
})
}
@@ -226,11 +222,10 @@ impl SharedState {
Ok(Self {
sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
peers_info: PeersInfo(vec![]),
wal_removal_on_hold: false,
})
}
pub(crate) fn get_wal_seg_size(&self) -> usize {
fn get_wal_seg_size(&self) -> usize {
self.sk.state.server.wal_seg_size as usize
}

View File

@@ -39,7 +39,6 @@ pub struct StateSnapshot {
// misc
pub cfile_last_persist_at: Instant,
pub inmem_flush_pending: bool,
pub wal_removal_on_hold: bool,
pub peers: Vec<PeerInfo>,
}
@@ -55,7 +54,6 @@ impl StateSnapshot {
cfile_backup_lsn: read_guard.sk.state.backup_lsn,
cfile_last_persist_at: read_guard.sk.state.pers.last_persist_at(),
inmem_flush_pending: Self::has_unflushed_inmem_state(&read_guard),
wal_removal_on_hold: read_guard.wal_removal_on_hold,
peers: read_guard.get_peers(heartbeat_timeout),
}
}
@@ -326,8 +324,8 @@ async fn update_wal_removal(
last_removed_segno: u64,
wal_removal_task: &mut Option<JoinHandle<anyhow::Result<u64>>>,
) {
if wal_removal_task.is_some() || state.wal_removal_on_hold {
// WAL removal is already in progress or hold off
if wal_removal_task.is_some() {
// WAL removal is already in progress
return;
}

View File

@@ -684,12 +684,13 @@ impl WalReader {
let xlogoff = self.pos.segment_offset(self.wal_seg_size);
let segno = self.pos.segment_number(self.wal_seg_size);
let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size);
let wal_file_path = self.timeline_dir.join(&wal_file_name);
// Try to open local file, if we may have WAL locally
if self.pos >= self.local_start_lsn {
let res = open_wal_file(&self.timeline_dir, segno, self.wal_seg_size).await;
let res = Self::open_wal_file(&wal_file_path).await;
match res {
Ok((mut file, _)) => {
Ok(mut file) => {
file.seek(SeekFrom::Start(xlogoff as u64)).await?;
return Ok(Box::pin(file));
}
@@ -717,6 +718,25 @@ impl WalReader {
bail!("WAL segment is not found")
}
/// Helper function for opening a wal file.
async fn open_wal_file(wal_file_path: &Utf8Path) -> Result<tokio::fs::File> {
// First try to open the .partial file.
let mut partial_path = wal_file_path.to_owned();
partial_path.set_extension("partial");
if let Ok(opened_file) = tokio::fs::File::open(&partial_path).await {
return Ok(opened_file);
}
// If that failed, try it without the .partial extension.
tokio::fs::File::open(&wal_file_path)
.await
.with_context(|| format!("Failed to open WAL file {:?}", wal_file_path))
.map_err(|e| {
warn!("{}", e);
e
})
}
}
/// Zero block for filling created WAL segments.
@@ -738,34 +758,6 @@ async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
Ok(())
}
/// Helper function for opening WAL segment `segno` in `dir`. Returns file and
/// whether it is .partial.
pub(crate) async fn open_wal_file(
timeline_dir: &Utf8Path,
segno: XLogSegNo,
wal_seg_size: usize,
) -> Result<(tokio::fs::File, bool)> {
let (wal_file_path, wal_file_partial_path) = wal_file_paths(timeline_dir, segno, wal_seg_size)?;
// First try to open the .partial file.
let mut partial_path = wal_file_path.to_owned();
partial_path.set_extension("partial");
if let Ok(opened_file) = tokio::fs::File::open(&wal_file_partial_path).await {
return Ok((opened_file, true));
}
// If that failed, try it without the .partial extension.
let pf = tokio::fs::File::open(&wal_file_path)
.await
.with_context(|| format!("failed to open WAL file {:#}", wal_file_path))
.map_err(|e| {
warn!("{}", e);
e
})?;
Ok((pf, false))
}
/// Helper returning full path to WAL segment file and its .partial brother.
pub fn wal_file_paths(
timeline_dir: &Utf8Path,

View File

@@ -174,7 +174,6 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
pg_auth: None,
pg_tenant_only_auth: None,
http_auth: None,
sk_auth_token: None,
current_thread_runtime: false,
walsenders_keep_horizon: false,
partial_backup_enabled: false,

View File

@@ -40,7 +40,6 @@ tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true
measured.workspace = true
scopeguard.workspace = true
strum.workspace = true
strum_macros.workspace = true

View File

@@ -1,59 +0,0 @@
use std::{borrow::Cow, fmt::Debug, fmt::Display};
use tokio_util::sync::CancellationToken;
use utils::id::NodeId;
pub(crate) const MAX_RECONCILES_PER_OPERATION: usize = 10;
#[derive(Copy, Clone)]
pub(crate) struct Drain {
pub(crate) node_id: NodeId,
}
#[derive(Copy, Clone)]
pub(crate) struct Fill {
pub(crate) node_id: NodeId,
}
#[derive(Copy, Clone)]
pub(crate) enum Operation {
Drain(Drain),
Fill(Fill),
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum OperationError {
#[error("Node state changed during operation: {0}")]
NodeStateChanged(Cow<'static, str>),
#[error("Operation finalize error: {0}")]
FinalizeError(Cow<'static, str>),
#[error("Operation cancelled")]
Cancelled,
}
pub(crate) struct OperationHandler {
pub(crate) operation: Operation,
#[allow(unused)]
pub(crate) cancel: CancellationToken,
}
impl Display for Drain {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "drain {}", self.node_id)
}
}
impl Display for Fill {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "fill {}", self.node_id)
}
}
impl Display for Operation {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Operation::Drain(op) => write!(f, "{op}"),
Operation::Fill(op) => write!(f, "{op}"),
}
}
}

View File

@@ -283,13 +283,7 @@ impl ComputeHook {
// all calls to this function
let _locked = self.neon_local_lock.lock().await;
let Some(repo_dir) = self.config.neon_local_repo_dir.as_deref() else {
tracing::warn!(
"neon_local_repo_dir not set, likely a bug in neon_local; skipping compute update"
);
return Ok(());
};
let env = match LocalEnv::load_config(repo_dir) {
let env = match LocalEnv::load_config() {
Ok(e) => e,
Err(e) => {
tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");

View File

@@ -31,7 +31,6 @@ pub(crate) enum PageserverState {
Available {
last_seen_at: Instant,
utilization: PageserverUtilization,
new: bool,
},
Offline,
}
@@ -128,7 +127,6 @@ impl HeartbeaterTask {
heartbeat_futs.push({
let jwt_token = self.jwt_token.clone();
let cancel = self.cancel.clone();
let new_node = !self.state.contains_key(node_id);
// Clone the node and mark it as available such that the request
// goes through to the pageserver even when the node is marked offline.
@@ -161,7 +159,6 @@ impl HeartbeaterTask {
PageserverState::Available {
last_seen_at: Instant::now(),
utilization,
new: new_node,
}
} else {
PageserverState::Offline
@@ -223,7 +220,6 @@ impl HeartbeaterTask {
}
},
Vacant(_) => {
// This is a new node. Don't generate a delta for it.
deltas.push((node_id, ps_state.clone()));
}
}

View File

@@ -480,39 +480,6 @@ async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>,
)
}
async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
let node_status = state.service.get_node(node_id).await?;
json_response(StatusCode::OK, node_status)
}
async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
state.service.start_node_drain(node_id).await?;
json_response(StatusCode::ACCEPTED, ())
}
async fn handle_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
state.service.start_node_fill(node_id).await?;
json_response(StatusCode::ACCEPTED, ())
}
async fn handle_tenant_shard_split(
service: Arc<Service>,
mut req: Request<Body>,
@@ -865,16 +832,6 @@ pub fn make_router(
RequestName("control_v1_node_config"),
)
})
.get("/control/v1/node/:node_id", |r| {
named_request_span(r, handle_node_status, RequestName("control_v1_node_status"))
})
.put("/control/v1/node/:node_id/drain", |r| {
named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain"))
})
.put("/control/v1/node/:node_id/fill", |r| {
named_request_span(r, handle_node_fill, RequestName("control_v1_node_fill"))
})
// TODO(vlad): endpoint for cancelling drain and fill
// Tenant Shard operations
.put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
tenant_service_handler(

View File

@@ -2,7 +2,6 @@ use serde::Serialize;
use utils::seqwait::MonotonicCounter;
mod auth;
mod background_node_operations;
mod compute_hook;
mod heartbeater;
pub mod http;

View File

@@ -4,7 +4,6 @@ use clap::Parser;
use diesel::Connection;
use metrics::launch_timestamp::LaunchTimestamp;
use metrics::BuildInfo;
use std::path::PathBuf;
use std::sync::Arc;
use storage_controller::http::make_router;
use storage_controller::metrics::preinitialize_metrics;
@@ -78,12 +77,6 @@ struct Cli {
/// How long to wait for the initial database connection to be available.
#[arg(long, default_value = "5s")]
db_connect_timeout: humantime::Duration,
/// `neon_local` sets this to the path of the neon_local repo dir.
/// Only relevant for testing.
// TODO: make `cfg(feature = "testing")`
#[arg(long)]
neon_local_repo_dir: Option<PathBuf>,
}
enum StrictMode {
@@ -267,7 +260,6 @@ async fn async_main() -> anyhow::Result<()> {
.reconciler_concurrency
.unwrap_or(RECONCILER_CONCURRENCY_DEFAULT),
split_threshold: args.split_threshold,
neon_local_repo_dir: args.neon_local_repo_dir,
};
// After loading secrets & config, but before starting anything else, apply database migrations

View File

@@ -3,7 +3,7 @@ use std::{str::FromStr, time::Duration};
use pageserver_api::{
controller_api::{
NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy,
TenantLocateResponseShard, UtilizationScore,
TenantLocateResponseShard,
},
shard::TenantShardId,
};
@@ -59,10 +59,6 @@ impl Node {
self.id
}
pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy {
self.scheduling
}
pub(crate) fn set_scheduling(&mut self, scheduling: NodeSchedulingPolicy) {
self.scheduling = scheduling
}
@@ -120,16 +116,6 @@ impl Node {
match (self.availability, availability) {
(Offline, Active(_)) => ToActive,
(Active(_), Offline) => ToOffline,
// Consider the case when the storage controller handles the re-attach of a node
// before the heartbeats detect that the node is back online. We still need
// [`Service::node_configure`] to attempt reconciliations for shards with an
// unknown observed location.
// The unsavoury match arm below handles this situation.
(Active(lhs), Active(rhs))
if lhs == UtilizationScore::worst() && rhs < UtilizationScore::worst() =>
{
ToActive
}
_ => Unchanged,
}
}
@@ -155,7 +141,6 @@ impl Node {
NodeSchedulingPolicy::Draining => MaySchedule::No,
NodeSchedulingPolicy::Filling => MaySchedule::Yes(score),
NodeSchedulingPolicy::Pause => MaySchedule::No,
NodeSchedulingPolicy::PauseForRestart => MaySchedule::No,
}
}
@@ -172,7 +157,7 @@ impl Node {
listen_http_port,
listen_pg_addr,
listen_pg_port,
scheduling: NodeSchedulingPolicy::Active,
scheduling: NodeSchedulingPolicy::Filling,
availability: NodeAvailability::Offline,
cancel: CancellationToken::new(),
}

View File

@@ -442,15 +442,13 @@ impl Persistence {
#[tracing::instrument(skip_all, fields(node_id))]
pub(crate) async fn re_attach(
&self,
input_node_id: NodeId,
node_id: NodeId,
) -> DatabaseResult<HashMap<TenantShardId, Generation>> {
use crate::schema::nodes::dsl::scheduling_policy;
use crate::schema::nodes::dsl::*;
use crate::schema::tenant_shards::dsl::*;
let updated = self
.with_measured_conn(DatabaseOperation::ReAttach, move |conn| {
let rows_updated = diesel::update(tenant_shards)
.filter(generation_pageserver.eq(input_node_id.0 as i64))
.filter(generation_pageserver.eq(node_id.0 as i64))
.set(generation.eq(generation + 1))
.execute(conn)?;
@@ -459,23 +457,9 @@ impl Persistence {
// TODO: UPDATE+SELECT in one query
let updated = tenant_shards
.filter(generation_pageserver.eq(input_node_id.0 as i64))
.filter(generation_pageserver.eq(node_id.0 as i64))
.select(TenantShardPersistence::as_select())
.load(conn)?;
// If the node went through a drain and restart phase before re-attaching,
// then reset it's node scheduling policy to active.
diesel::update(nodes)
.filter(node_id.eq(input_node_id.0 as i64))
.filter(
scheduling_policy
.eq(String::from(NodeSchedulingPolicy::PauseForRestart))
.or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Draining)))
.or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Filling))),
)
.set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active)))
.execute(conn)?;
Ok(updated)
})
.await?;

View File

@@ -1,5 +1,4 @@
use crate::{node::Node, tenant_shard::TenantShard};
use itertools::Itertools;
use pageserver_api::controller_api::UtilizationScore;
use serde::Serialize;
use std::collections::HashMap;
@@ -284,44 +283,6 @@ impl Scheduler {
}
}
// Check if the number of shards attached to a given node is lagging below
// the cluster average. If that's the case, the node should be filled.
pub(crate) fn compute_fill_requirement(&self, node_id: NodeId) -> usize {
let Some(node) = self.nodes.get(&node_id) else {
debug_assert!(false);
tracing::error!("Scheduler missing node {node_id}");
return 0;
};
assert!(!self.nodes.is_empty());
let expected_attached_shards_per_node = self.expected_attached_shard_count();
for (node_id, node) in self.nodes.iter() {
tracing::trace!(%node_id, "attached_shard_count={} shard_count={} expected={}", node.attached_shard_count, node.shard_count, expected_attached_shards_per_node);
}
if node.attached_shard_count < expected_attached_shards_per_node {
expected_attached_shards_per_node - node.attached_shard_count
} else {
0
}
}
pub(crate) fn expected_attached_shard_count(&self) -> usize {
let total_attached_shards: usize =
self.nodes.values().map(|n| n.attached_shard_count).sum();
assert!(!self.nodes.is_empty());
total_attached_shards / self.nodes.len()
}
pub(crate) fn nodes_by_attached_shard_count(&self) -> Vec<(NodeId, usize)> {
self.nodes
.iter()
.map(|(node_id, stats)| (*node_id, stats.attached_shard_count))
.sorted_by(|lhs, rhs| Ord::cmp(&lhs.1, &rhs.1).reverse())
.collect()
}
pub(crate) fn node_upsert(&mut self, node: &Node) {
use std::collections::hash_map::Entry::*;
match self.nodes.entry(node.get_id()) {

Some files were not shown because too many files have changed in this diff Show More