mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-13 23:50:36 +00:00
Compare commits
43 Commits
problame/r
...
arpad/comp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5e32cce2d9 | ||
|
|
6b67135bd3 | ||
|
|
66d3bef947 | ||
|
|
e749e73ad6 | ||
|
|
c5034f0e45 | ||
|
|
d2533c06a5 | ||
|
|
0fdeca882c | ||
|
|
f1bebda713 | ||
|
|
983972f812 | ||
|
|
2ea8d1b151 | ||
|
|
2f70221503 | ||
|
|
07bd0ce69e | ||
|
|
40e79712eb | ||
|
|
88b24e1593 | ||
|
|
8fcdc22283 | ||
|
|
2eb8b428cc | ||
|
|
e6a0e7ec61 | ||
|
|
843d996cb1 | ||
|
|
c824ffe1dc | ||
|
|
dadbd87ac1 | ||
|
|
0e667dcd93 | ||
|
|
14447b98ce | ||
|
|
8fcb236783 | ||
|
|
a9963db8c3 | ||
|
|
0c500450fe | ||
|
|
3182c3361a | ||
|
|
80803ff098 | ||
|
|
9b74d554b4 | ||
|
|
fce252fb2c | ||
|
|
f132658bd9 | ||
|
|
d030cbffec | ||
|
|
554a6bd4a6 | ||
|
|
9850794250 | ||
|
|
f5baac2579 | ||
|
|
2d37db234a | ||
|
|
8745c0d6f2 | ||
|
|
6c6a7f9ace | ||
|
|
e729f28205 | ||
|
|
b6e1c09c73 | ||
|
|
16d80128ee | ||
|
|
2ba414525e | ||
|
|
46210035c5 | ||
|
|
81892199f6 |
8
.github/workflows/benchmarking.yml
vendored
8
.github/workflows/benchmarking.yml
vendored
@@ -99,7 +99,7 @@ jobs:
|
||||
# Set --sparse-ordering option of pytest-order plugin
|
||||
# to ensure tests are running in order of appears in the file.
|
||||
# It's important for test_perf_pgbench.py::test_pgbench_remote_* tests
|
||||
extra_params: -m remote_cluster --sparse-ordering --timeout 5400 --ignore test_runner/performance/test_perf_olap.py
|
||||
extra_params: -m remote_cluster --sparse-ordering --timeout 5400 --ignore test_runner/performance/test_perf_olap.py --ignore test_runner/performance/test_perf_pgvector_queries.py
|
||||
env:
|
||||
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
@@ -410,14 +410,14 @@ jobs:
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
|
||||
- name: Benchmark pgvector hnsw queries
|
||||
- name: Benchmark pgvector queries
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
build_type: ${{ env.BUILD_TYPE }}
|
||||
test_selection: performance
|
||||
test_selection: performance/test_perf_pgvector_queries.py
|
||||
run_in_parallel: false
|
||||
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
|
||||
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_pgvector
|
||||
extra_params: -m remote_cluster --timeout 21600
|
||||
env:
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
|
||||
@@ -30,7 +30,6 @@ jobs:
|
||||
check-image:
|
||||
uses: ./.github/workflows/check-build-tools-image.yml
|
||||
|
||||
# This job uses older version of GitHub Actions because it's run on gen2 runners, which don't support node 20 (for newer versions)
|
||||
build-image:
|
||||
needs: [ check-image ]
|
||||
if: needs.check-image.outputs.found == 'false'
|
||||
|
||||
62
.github/workflows/build_and_test.yml
vendored
62
.github/workflows/build_and_test.yml
vendored
@@ -299,21 +299,21 @@ jobs:
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v14
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
|
||||
|
||||
- name: Cache postgres v15 build
|
||||
id: cache_pg_15
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v15
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
|
||||
|
||||
- name: Cache postgres v16 build
|
||||
id: cache_pg_16
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v16
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
|
||||
|
||||
- name: Build postgres v14
|
||||
if: steps.cache_pg_14.outputs.cache-hit != 'true'
|
||||
@@ -337,34 +337,8 @@ jobs:
|
||||
run: |
|
||||
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests
|
||||
|
||||
- name: Run rust tests
|
||||
env:
|
||||
NEXTEST_RETRIES: 3
|
||||
run: |
|
||||
#nextest does not yet support running doctests
|
||||
cargo test --doc $CARGO_FLAGS $CARGO_FEATURES
|
||||
|
||||
for io_engine in std-fs tokio-epoll-uring ; do
|
||||
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES
|
||||
done
|
||||
|
||||
# Run separate tests for real S3
|
||||
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
|
||||
export REMOTE_STORAGE_S3_BUCKET=neon-github-ci-tests
|
||||
export REMOTE_STORAGE_S3_REGION=eu-central-1
|
||||
# Avoid `$CARGO_FEATURES` since there's no `testing` feature in the e2e tests now
|
||||
${cov_prefix} cargo nextest run $CARGO_FLAGS -E 'package(remote_storage)' -E 'test(test_real_s3)'
|
||||
|
||||
# Run separate tests for real Azure Blob Storage
|
||||
# XXX: replace region with `eu-central-1`-like region
|
||||
export ENABLE_REAL_AZURE_REMOTE_STORAGE=y
|
||||
export AZURE_STORAGE_ACCOUNT="${{ secrets.AZURE_STORAGE_ACCOUNT_DEV }}"
|
||||
export AZURE_STORAGE_ACCESS_KEY="${{ secrets.AZURE_STORAGE_ACCESS_KEY_DEV }}"
|
||||
export REMOTE_STORAGE_AZURE_CONTAINER="${{ vars.REMOTE_STORAGE_AZURE_CONTAINER }}"
|
||||
export REMOTE_STORAGE_AZURE_REGION="${{ vars.REMOTE_STORAGE_AZURE_REGION }}"
|
||||
# Avoid `$CARGO_FEATURES` since there's no `testing` feature in the e2e tests now
|
||||
${cov_prefix} cargo nextest run $CARGO_FLAGS -E 'package(remote_storage)' -E 'test(test_real_azure)'
|
||||
|
||||
# Do install *before* running rust tests because they might recompile the
|
||||
# binaries with different features/flags.
|
||||
- name: Install rust binaries
|
||||
run: |
|
||||
# Install target binaries
|
||||
@@ -405,6 +379,32 @@ jobs:
|
||||
done
|
||||
fi
|
||||
|
||||
- name: Run rust tests
|
||||
env:
|
||||
NEXTEST_RETRIES: 3
|
||||
run: |
|
||||
#nextest does not yet support running doctests
|
||||
cargo test --doc $CARGO_FLAGS $CARGO_FEATURES
|
||||
|
||||
for io_engine in std-fs tokio-epoll-uring ; do
|
||||
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES
|
||||
done
|
||||
|
||||
# Run separate tests for real S3
|
||||
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
|
||||
export REMOTE_STORAGE_S3_BUCKET=neon-github-ci-tests
|
||||
export REMOTE_STORAGE_S3_REGION=eu-central-1
|
||||
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(remote_storage)' -E 'test(test_real_s3)'
|
||||
|
||||
# Run separate tests for real Azure Blob Storage
|
||||
# XXX: replace region with `eu-central-1`-like region
|
||||
export ENABLE_REAL_AZURE_REMOTE_STORAGE=y
|
||||
export AZURE_STORAGE_ACCOUNT="${{ secrets.AZURE_STORAGE_ACCOUNT_DEV }}"
|
||||
export AZURE_STORAGE_ACCESS_KEY="${{ secrets.AZURE_STORAGE_ACCESS_KEY_DEV }}"
|
||||
export REMOTE_STORAGE_AZURE_CONTAINER="${{ vars.REMOTE_STORAGE_AZURE_CONTAINER }}"
|
||||
export REMOTE_STORAGE_AZURE_REGION="${{ vars.REMOTE_STORAGE_AZURE_REGION }}"
|
||||
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(remote_storage)' -E 'test(test_real_azure)'
|
||||
|
||||
- name: Install postgres binaries
|
||||
run: cp -a pg_install /tmp/neon/pg_install
|
||||
|
||||
|
||||
23
.github/workflows/check-build-tools-image.yml
vendored
23
.github/workflows/check-build-tools-image.yml
vendored
@@ -25,26 +25,17 @@ jobs:
|
||||
found: ${{ steps.check-image.outputs.found }}
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Get build-tools image tag for the current commit
|
||||
id: get-build-tools-tag
|
||||
env:
|
||||
# Usually, for COMMIT_SHA, we use `github.event.pull_request.head.sha || github.sha`, but here, even for PRs,
|
||||
# we want to use `github.sha` i.e. point to a phantom merge commit to determine the image tag correctly.
|
||||
COMMIT_SHA: ${{ github.sha }}
|
||||
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
IMAGE_TAG: |
|
||||
${{ hashFiles('Dockerfile.build-tools',
|
||||
'.github/workflows/check-build-tools-image.yml',
|
||||
'.github/workflows/build-build-tools-image.yml') }}
|
||||
run: |
|
||||
LAST_BUILD_TOOLS_SHA=$(
|
||||
gh api \
|
||||
-H "Accept: application/vnd.github+json" \
|
||||
-H "X-GitHub-Api-Version: 2022-11-28" \
|
||||
--method GET \
|
||||
--field path=Dockerfile.build-tools \
|
||||
--field sha=${COMMIT_SHA} \
|
||||
--field per_page=1 \
|
||||
--jq ".[0].sha" \
|
||||
"/repos/${GITHUB_REPOSITORY}/commits"
|
||||
)
|
||||
echo "image-tag=${LAST_BUILD_TOOLS_SHA}" | tee -a $GITHUB_OUTPUT
|
||||
echo "image-tag=${IMAGE_TAG}" | tee -a $GITHUB_OUTPUT
|
||||
|
||||
- name: Check if such tag found in the registry
|
||||
id: check-image
|
||||
|
||||
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -2946,6 +2946,15 @@ dependencies = [
|
||||
"hashbrown 0.14.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lz4_flex"
|
||||
version = "0.11.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5"
|
||||
dependencies = [
|
||||
"twox-hash",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "match_cfg"
|
||||
version = "0.1.0"
|
||||
@@ -3612,6 +3621,7 @@ dependencies = [
|
||||
"hyper 0.14.26",
|
||||
"itertools",
|
||||
"leaky-bucket",
|
||||
"lz4_flex",
|
||||
"md5",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
|
||||
@@ -110,6 +110,7 @@ jsonwebtoken = "9"
|
||||
lasso = "0.7"
|
||||
leaky-bucket = "1.0.1"
|
||||
libc = "0.2"
|
||||
lz4_flex = "0.11"
|
||||
md5 = "0.7.0"
|
||||
measured = { version = "0.0.21", features=["lasso"] }
|
||||
measured-process = { version = "0.0.21" }
|
||||
|
||||
@@ -69,8 +69,6 @@ RUN set -e \
|
||||
&& apt install -y \
|
||||
libreadline-dev \
|
||||
libseccomp-dev \
|
||||
libicu67 \
|
||||
openssl \
|
||||
ca-certificates \
|
||||
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \
|
||||
&& useradd -d /data neon \
|
||||
|
||||
@@ -112,6 +112,45 @@ RUN for package in Capture::Tiny DateTime Devel::Cover Digest::MD5 File::Spec JS
|
||||
&& make install \
|
||||
&& rm -rf ../lcov.tar.gz
|
||||
|
||||
# Compile and install the static OpenSSL library
|
||||
ENV OPENSSL_VERSION=3.2.2
|
||||
ENV OPENSSL_PREFIX=/usr/local/openssl
|
||||
RUN wget -O /tmp/openssl-${OPENSSL_VERSION}.tar.gz https://www.openssl.org/source/openssl-${OPENSSL_VERSION}.tar.gz && \
|
||||
echo "197149c18d9e9f292c43f0400acaba12e5f52cacfe050f3d199277ea738ec2e7 /tmp/openssl-${OPENSSL_VERSION}.tar.gz" | sha256sum --check && \
|
||||
cd /tmp && \
|
||||
tar xzvf /tmp/openssl-${OPENSSL_VERSION}.tar.gz && \
|
||||
rm /tmp/openssl-${OPENSSL_VERSION}.tar.gz && \
|
||||
cd /tmp/openssl-${OPENSSL_VERSION} && \
|
||||
./config --prefix=${OPENSSL_PREFIX} -static --static no-shared -fPIC && \
|
||||
make -j "$(nproc)" && \
|
||||
make install && \
|
||||
cd /tmp && \
|
||||
rm -rf /tmp/openssl-${OPENSSL_VERSION}
|
||||
|
||||
# Use the same version of libicu as the compute nodes so that
|
||||
# clusters created using inidb on pageserver can be used by computes.
|
||||
#
|
||||
# TODO: at this time, Dockerfile.compute-node uses the debian bullseye libicu
|
||||
# package, which is 67.1. We're duplicating that knowledge here, and also, technically,
|
||||
# Debian has a few patches on top of 67.1 that we're not adding here.
|
||||
ENV ICU_VERSION=67.1
|
||||
ENV ICU_PREFIX=/usr/local/icu
|
||||
|
||||
# Download and build static ICU
|
||||
RUN wget -O /tmp/libicu-${ICU_VERSION}.tgz https://github.com/unicode-org/icu/releases/download/release-${ICU_VERSION//./-}/icu4c-${ICU_VERSION//./_}-src.tgz && \
|
||||
echo "94a80cd6f251a53bd2a997f6f1b5ac6653fe791dfab66e1eb0227740fb86d5dc /tmp/libicu-${ICU_VERSION}.tgz" | sha256sum --check && \
|
||||
mkdir /tmp/icu && \
|
||||
pushd /tmp/icu && \
|
||||
tar -xzf /tmp/libicu-${ICU_VERSION}.tgz && \
|
||||
pushd icu/source && \
|
||||
./configure --prefix=${ICU_PREFIX} --enable-static --enable-shared=no CXXFLAGS="-fPIC" CFLAGS="-fPIC" && \
|
||||
make -j "$(nproc)" && \
|
||||
make install && \
|
||||
popd && \
|
||||
rm -rf icu && \
|
||||
rm -f /tmp/libicu-${ICU_VERSION}.tgz && \
|
||||
popd
|
||||
|
||||
# Switch to nonroot user
|
||||
USER nonroot:nonroot
|
||||
WORKDIR /home/nonroot
|
||||
@@ -170,3 +209,6 @@ RUN whoami \
|
||||
&& rustup --version --verbose \
|
||||
&& rustc --version --verbose \
|
||||
&& clang --version
|
||||
|
||||
# Set following flag to check in Makefile if its running in Docker
|
||||
RUN touch /home/nonroot/.docker_build
|
||||
|
||||
15
Makefile
15
Makefile
@@ -3,6 +3,9 @@ ROOT_PROJECT_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
|
||||
# Where to install Postgres, default is ./pg_install, maybe useful for package managers
|
||||
POSTGRES_INSTALL_DIR ?= $(ROOT_PROJECT_DIR)/pg_install/
|
||||
|
||||
OPENSSL_PREFIX_DIR := /usr/local/openssl
|
||||
ICU_PREFIX_DIR := /usr/local/icu
|
||||
|
||||
#
|
||||
# We differentiate between release / debug build types using the BUILD_TYPE
|
||||
# environment variable.
|
||||
@@ -20,6 +23,16 @@ else
|
||||
$(error Bad build type '$(BUILD_TYPE)', see Makefile for options)
|
||||
endif
|
||||
|
||||
ifeq ($(shell test -e /home/nonroot/.docker_build && echo -n yes),yes)
|
||||
# Exclude static build openssl, icu for local build (MacOS, Linux)
|
||||
# Only keep for build type release and debug
|
||||
PG_CFLAGS += -I$(OPENSSL_PREFIX_DIR)/include
|
||||
PG_CONFIGURE_OPTS += --with-icu
|
||||
PG_CONFIGURE_OPTS += ICU_CFLAGS='-I/$(ICU_PREFIX_DIR)/include -DU_STATIC_IMPLEMENTATION'
|
||||
PG_CONFIGURE_OPTS += ICU_LIBS='-L$(ICU_PREFIX_DIR)/lib -L$(ICU_PREFIX_DIR)/lib64 -licui18n -licuuc -licudata -lstdc++ -Wl,-Bdynamic -lm'
|
||||
PG_CONFIGURE_OPTS += LDFLAGS='-L$(OPENSSL_PREFIX_DIR)/lib -L$(OPENSSL_PREFIX_DIR)/lib64 -L$(ICU_PREFIX_DIR)/lib -L$(ICU_PREFIX_DIR)/lib64 -Wl,-Bstatic -lssl -lcrypto -Wl,-Bdynamic -lrt -lm -ldl -lpthread'
|
||||
endif
|
||||
|
||||
UNAME_S := $(shell uname -s)
|
||||
ifeq ($(UNAME_S),Linux)
|
||||
# Seccomp BPF is only available for Linux
|
||||
@@ -28,7 +41,7 @@ else ifeq ($(UNAME_S),Darwin)
|
||||
ifndef DISABLE_HOMEBREW
|
||||
# macOS with brew-installed openssl requires explicit paths
|
||||
# It can be configured with OPENSSL_PREFIX variable
|
||||
OPENSSL_PREFIX ?= $(shell brew --prefix openssl@3)
|
||||
OPENSSL_PREFIX := $(shell brew --prefix openssl@3)
|
||||
PG_CONFIGURE_OPTS += --with-includes=$(OPENSSL_PREFIX)/include --with-libraries=$(OPENSSL_PREFIX)/lib
|
||||
PG_CONFIGURE_OPTS += PKG_CONFIG_PATH=$(shell brew --prefix icu4c)/lib/pkgconfig
|
||||
# macOS already has bison and flex in the system, but they are old and result in postgres-v14 target failure
|
||||
|
||||
@@ -918,38 +918,39 @@ impl ComputeNode {
|
||||
// temporarily reset max_cluster_size in config
|
||||
// to avoid the possibility of hitting the limit, while we are reconfiguring:
|
||||
// creating new extensions, roles, etc...
|
||||
config::compute_ctl_temp_override_create(pgdata_path, "neon.max_cluster_size=-1")?;
|
||||
self.pg_reload_conf()?;
|
||||
config::with_compute_ctl_tmp_override(pgdata_path, "neon.max_cluster_size=-1", || {
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
|
||||
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
|
||||
|
||||
// Proceed with post-startup configuration. Note, that order of operations is important.
|
||||
// Disable DDL forwarding because control plane already knows about these roles/databases.
|
||||
if spec.mode == ComputeMode::Primary {
|
||||
client.simple_query("SET neon.forward_ddl = false")?;
|
||||
cleanup_instance(&mut client)?;
|
||||
handle_roles(&spec, &mut client)?;
|
||||
handle_databases(&spec, &mut client)?;
|
||||
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(
|
||||
&spec,
|
||||
&mut client,
|
||||
self.connstr.as_str(),
|
||||
self.has_feature(ComputeFeature::AnonExtension),
|
||||
)?;
|
||||
handle_extensions(&spec, &mut client)?;
|
||||
handle_extension_neon(&mut client)?;
|
||||
// We can skip handle_migrations here because a new migration can only appear
|
||||
// if we have a new version of the compute_ctl binary, which can only happen
|
||||
// if compute got restarted, in which case we'll end up inside of apply_config
|
||||
// instead of reconfigure.
|
||||
}
|
||||
// Proceed with post-startup configuration. Note, that order of operations is important.
|
||||
// Disable DDL forwarding because control plane already knows about these roles/databases.
|
||||
if spec.mode == ComputeMode::Primary {
|
||||
client.simple_query("SET neon.forward_ddl = false")?;
|
||||
cleanup_instance(&mut client)?;
|
||||
handle_roles(&spec, &mut client)?;
|
||||
handle_databases(&spec, &mut client)?;
|
||||
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(
|
||||
&spec,
|
||||
&mut client,
|
||||
self.connstr.as_str(),
|
||||
self.has_feature(ComputeFeature::AnonExtension),
|
||||
)?;
|
||||
handle_extensions(&spec, &mut client)?;
|
||||
handle_extension_neon(&mut client)?;
|
||||
// We can skip handle_migrations here because a new migration can only appear
|
||||
// if we have a new version of the compute_ctl binary, which can only happen
|
||||
// if compute got restarted, in which case we'll end up inside of apply_config
|
||||
// instead of reconfigure.
|
||||
}
|
||||
|
||||
// 'Close' connection
|
||||
drop(client);
|
||||
// 'Close' connection
|
||||
drop(client);
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
// reset max_cluster_size in config back to original value and reload config
|
||||
config::compute_ctl_temp_override_remove(pgdata_path)?;
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
let unknown_op = "unknown".to_string();
|
||||
@@ -1040,12 +1041,17 @@ impl ComputeNode {
|
||||
// temporarily reset max_cluster_size in config
|
||||
// to avoid the possibility of hitting the limit, while we are applying config:
|
||||
// creating new extensions, roles, etc...
|
||||
config::compute_ctl_temp_override_create(pgdata_path, "neon.max_cluster_size=-1")?;
|
||||
self.pg_reload_conf()?;
|
||||
config::with_compute_ctl_tmp_override(
|
||||
pgdata_path,
|
||||
"neon.max_cluster_size=-1",
|
||||
|| {
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
self.apply_config(&compute_state)?;
|
||||
self.apply_config(&compute_state)?;
|
||||
|
||||
config::compute_ctl_temp_override_remove(pgdata_path)?;
|
||||
Ok(())
|
||||
},
|
||||
)?;
|
||||
self.pg_reload_conf()?;
|
||||
}
|
||||
self.post_apply_config()?;
|
||||
|
||||
@@ -131,18 +131,17 @@ pub fn write_postgres_conf(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// create file compute_ctl_temp_override.conf in pgdata_dir
|
||||
/// add provided options to this file
|
||||
pub fn compute_ctl_temp_override_create(pgdata_path: &Path, options: &str) -> Result<()> {
|
||||
pub fn with_compute_ctl_tmp_override<F>(pgdata_path: &Path, options: &str, exec: F) -> Result<()>
|
||||
where
|
||||
F: FnOnce() -> Result<()>,
|
||||
{
|
||||
let path = pgdata_path.join("compute_ctl_temp_override.conf");
|
||||
let mut file = File::create(path)?;
|
||||
write!(file, "{}", options)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// remove file compute_ctl_temp_override.conf in pgdata_dir
|
||||
pub fn compute_ctl_temp_override_remove(pgdata_path: &Path) -> Result<()> {
|
||||
let path = pgdata_path.join("compute_ctl_temp_override.conf");
|
||||
std::fs::remove_file(path)?;
|
||||
Ok(())
|
||||
let res = exec();
|
||||
|
||||
file.set_len(0)?;
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ use hyper::header::CONTENT_TYPE;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
||||
use tokio::task;
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use tracing_utils::http::OtelName;
|
||||
use utils::http::request::must_get_query_param;
|
||||
|
||||
@@ -48,7 +48,7 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
match (req.method(), req.uri().path()) {
|
||||
// Serialized compute state.
|
||||
(&Method::GET, "/status") => {
|
||||
info!("serving /status GET request");
|
||||
debug!("serving /status GET request");
|
||||
let state = compute.state.lock().unwrap();
|
||||
let status_response = status_response_from_state(&state);
|
||||
Response::new(Body::from(serde_json::to_string(&status_response).unwrap()))
|
||||
|
||||
@@ -1,139 +0,0 @@
|
||||
# Postgres Bundle for Pageserver
|
||||
|
||||
Created on 2024-06-17
|
||||
|
||||
## Summary
|
||||
|
||||
This RFC defines the responsibilities of Compute and Storage team regarding the
|
||||
build & deployment of the Postgres code that Pageserver must run
|
||||
(`initdb`, `postgres --wal-redo`).
|
||||
|
||||
## Motivation
|
||||
|
||||
Pageserver has to run Postgres binaries to do its job, specifically
|
||||
|
||||
* `initdb`
|
||||
* `postgres --wal-redo` mode
|
||||
|
||||
Currently there is **no clear ownership** of
|
||||
* how these binaries are built
|
||||
* including, critically, dynamic linkage against other libraries such as `libicu`
|
||||
* what build of the binaries ends up running on Pageservers
|
||||
* how the binaries and runtime dependencies (e.g., shared libraries) are delivered to Pageservers
|
||||
|
||||
Further, these binaries have dependencies (e.g., libicu) which
|
||||
1. prevent the Storage team from switching Pageserver distro and/or version, and
|
||||
2. some dependencies impact compatibility between Storage and Compute (e.g., [libicu version impacts collation incompatibilty](https://github.com/neondatabase/neon/pull/8074))
|
||||
3. some dependencies can cause database corruption if updated carelessly (locale => libc)
|
||||
|
||||
## Why Is This Worth Solving
|
||||
|
||||
1. Clearly defined ownership generally boosts execution speed & bug triage.
|
||||
* Example for why execution speed matters: CVE in dependency => who takes care of patching & updating.
|
||||
2. Centralize understanding of risks involved with some dependencies.
|
||||
Currently, there is no team clearly responsible for assessing / tracking the risks. As a reminder from previous section, these are
|
||||
* runtime incompatibilities
|
||||
* database corruption
|
||||
|
||||
Also, it is an unlock for additional future value, see "Future Work" section.
|
||||
|
||||
## Impacted components (e.g. pageserver, safekeeper, console, etc)
|
||||
|
||||
Pageserver (neon.git)
|
||||
Compute (neon.git)
|
||||
Deployment process (aws.git)
|
||||
|
||||
## Design
|
||||
|
||||
The basic interface between Compute and Storage team is as follows:
|
||||
|
||||
* Compute team publishes a "bundle" of the binaries required by Pageserver
|
||||
* Storage team uses a pinned bundle in the Pageserver build process
|
||||
* Storage team code review is required to update the pinned version
|
||||
|
||||
The "bundle" provides an interface agreed upon by Compute and Storage teams to run
|
||||
* for each supported Postgres version at Neon (v14, v15, v16, ...)
|
||||
* the `initdb` process
|
||||
* behaving like a vanilla Postgres `initdb`
|
||||
* `postgres --wal-redo` mode process
|
||||
* following the walredo protocol specified elsewhere
|
||||
|
||||
The bundle is self-contained, i.e., it behaves the same way on any Linux system.
|
||||
The only ambient runtime dependency is the Linux kernel.
|
||||
The minimum Linux kernel version is 5.10.
|
||||
|
||||
### Variant 1: bundle = fully statically linked binaries
|
||||
The "bundle" is a tarball of fully statically linked binaries
|
||||
|
||||
```
|
||||
v14/initdb
|
||||
v14/postgres
|
||||
v15/initdb
|
||||
v15/postgres
|
||||
v16/initdb
|
||||
v16/postgres
|
||||
...
|
||||
```
|
||||
|
||||
The directory structure is part of the interface.
|
||||
|
||||
### Variant 2: bundle = chrooted directory
|
||||
|
||||
The "bundle" is a tarball that contains all sorts of files, plus a launcher script.
|
||||
|
||||
```
|
||||
LAUNCHER
|
||||
storage
|
||||
storage/does
|
||||
storage/does/not
|
||||
storage/does/not/care
|
||||
```
|
||||
|
||||
To launch `initdb` or `postgres --wal-redo`, the Pageserver does
|
||||
1. fork child process
|
||||
2. `chroot` into the extracted directory
|
||||
3. inside the chroot, run `/LAUNCHER VERSION PG_BINARY [FLAGS...]`
|
||||
4. The `LAUNCHER` script sets up library search paths, etc, and then `exec`s the correct binary
|
||||
|
||||
We acknowledge this is half-way reinventing OCI + linux containers.
|
||||
However, our needs are much simpler than what OCI & Docker provide.
|
||||
Specifically, we do not want Pageserver to be runtime-dependent on e.g. Docker as the launcher.
|
||||
|
||||
The `chroot` is to enforce that the "bundle" be self-contained.
|
||||
The special path `/inout` int he bundle is reserved, e.g., for `initdb` output.
|
||||
|
||||
### Variant 3: ???
|
||||
|
||||
Your design here, feedback welcome.
|
||||
|
||||
## Security implications
|
||||
|
||||
It's an improvement because a single team (Compute) will be responsible for runtime dependencies.
|
||||
|
||||
## Implementation & Rollout
|
||||
|
||||
Storage and Compute teams agree on a bundle definition.
|
||||
|
||||
Compute team changes their build process to produce both
|
||||
1. existing: compute image / vm compute image
|
||||
2. existing: pg_install tarball (currently built by `neon.git:Dockerfile`)
|
||||
2. new: the bundle
|
||||
|
||||
Storage makes `neon.git` Pageserver changes to support using bundle (behind feature flag).
|
||||
With feature flag disabled, existing `pg_install` tarball is used instead.
|
||||
|
||||
Storage & infra make `aws.git` changes to deploy bundle to pageservers, with feature flag disabled.
|
||||
|
||||
Storage team does gradual rollout.
|
||||
|
||||
Storage & infra teams remove support for `pg_install`, delete it from the nodes (experimentation in staging to ensure no hidden runtime deps!)
|
||||
|
||||
Compute team stops producing `pg_install` tarball.
|
||||
|
||||
|
||||
## Future Work
|
||||
|
||||
We know that we can easily make pageserver fully statically linked.
|
||||
Together with the self-contained "bundle" proposed above, Pageserver can then be deployed to different OSes.
|
||||
For example, we have been entertaining the idea of trying Amazon Linux instead of Debian for Pageserver.
|
||||
That experiment would be a lot simpler.
|
||||
@@ -558,6 +558,12 @@ impl KeySpaceRandomAccum {
|
||||
self.ranges.push(range);
|
||||
}
|
||||
|
||||
pub fn add_keyspace(&mut self, keyspace: KeySpace) {
|
||||
for range in keyspace.ranges {
|
||||
self.add_range(range);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_keyspace(mut self) -> KeySpace {
|
||||
let mut ranges = Vec::new();
|
||||
if !self.ranges.is_empty() {
|
||||
|
||||
@@ -455,6 +455,26 @@ pub enum CompactionAlgorithm {
|
||||
Tiered,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Debug,
|
||||
Clone,
|
||||
Copy,
|
||||
PartialEq,
|
||||
Eq,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
strum_macros::FromRepr,
|
||||
strum_macros::EnumString,
|
||||
enum_map::Enum,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
pub enum ImageCompressionAlgorithm {
|
||||
ZstdLow,
|
||||
Zstd,
|
||||
ZstdHigh,
|
||||
LZ4,
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct CompactionAlgorithmSettings {
|
||||
pub kind: CompactionAlgorithm,
|
||||
|
||||
@@ -25,6 +25,8 @@ pub struct Config {
|
||||
///
|
||||
/// For simplicity, this value must be greater than or equal to `memory_history_len`.
|
||||
memory_history_log_interval: usize,
|
||||
/// The max number of iterations to skip before logging the next iteration
|
||||
memory_history_log_noskip_interval: Duration,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
@@ -33,6 +35,7 @@ impl Default for Config {
|
||||
memory_poll_interval: Duration::from_millis(100),
|
||||
memory_history_len: 5, // use 500ms of history for decision-making
|
||||
memory_history_log_interval: 20, // but only log every ~2s (otherwise it's spammy)
|
||||
memory_history_log_noskip_interval: Duration::from_secs(15), // but only if it's changed, or 60 seconds have passed
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -85,7 +88,12 @@ impl CgroupWatcher {
|
||||
|
||||
// buffer for samples that will be logged. once full, it remains so.
|
||||
let history_log_len = self.config.memory_history_log_interval;
|
||||
let max_skip = self.config.memory_history_log_noskip_interval;
|
||||
let mut history_log_buf = vec![MemoryStatus::zeroed(); history_log_len];
|
||||
let mut last_logged_memusage = MemoryStatus::zeroed();
|
||||
|
||||
// Ensure that we're tracking a value that's definitely in the past, as Instant::now is only guaranteed to be non-decreasing on Rust's T1-supported systems.
|
||||
let mut can_skip_logs_until = Instant::now() - max_skip;
|
||||
|
||||
for t in 0_u64.. {
|
||||
ticker.tick().await;
|
||||
@@ -115,12 +123,24 @@ impl CgroupWatcher {
|
||||
// equal to the logging interval, we can just log the entire buffer every time we set
|
||||
// the last entry, which also means that for this log line, we can ignore that it's a
|
||||
// ring buffer (because all the entries are in order of increasing time).
|
||||
if i == history_log_len - 1 {
|
||||
//
|
||||
// We skip logging the data if data hasn't meaningfully changed in a while, unless
|
||||
// we've already ignored previous iterations for the last max_skip period.
|
||||
if i == history_log_len - 1
|
||||
&& (now > can_skip_logs_until
|
||||
|| !history_log_buf
|
||||
.iter()
|
||||
.all(|usage| last_logged_memusage.status_is_close_or_similar(usage)))
|
||||
{
|
||||
info!(
|
||||
history = ?MemoryStatus::debug_slice(&history_log_buf),
|
||||
summary = ?summary,
|
||||
"Recent cgroup memory statistics history"
|
||||
);
|
||||
|
||||
can_skip_logs_until = now + max_skip;
|
||||
|
||||
last_logged_memusage = *history_log_buf.last().unwrap();
|
||||
}
|
||||
|
||||
updates
|
||||
@@ -232,6 +252,24 @@ impl MemoryStatus {
|
||||
|
||||
DS(slice)
|
||||
}
|
||||
|
||||
/// Check if the other memory status is a close or similar result.
|
||||
/// Returns true if the larger value is not larger than the smaller value
|
||||
/// by 1/8 of the smaller value, and within 128MiB.
|
||||
/// See tests::check_similarity_behaviour for examples of behaviour
|
||||
fn status_is_close_or_similar(&self, other: &MemoryStatus) -> bool {
|
||||
let margin;
|
||||
let diff;
|
||||
if self.non_reclaimable >= other.non_reclaimable {
|
||||
margin = other.non_reclaimable / 8;
|
||||
diff = self.non_reclaimable - other.non_reclaimable;
|
||||
} else {
|
||||
margin = self.non_reclaimable / 8;
|
||||
diff = other.non_reclaimable - self.non_reclaimable;
|
||||
}
|
||||
|
||||
diff < margin && diff < 128 * 1024 * 1024
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -261,4 +299,65 @@ mod tests {
|
||||
assert_eq!(values(2, 4), [9, 0, 1, 2]);
|
||||
assert_eq!(values(2, 10), [3, 4, 5, 6, 7, 8, 9, 0, 1, 2]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn check_similarity_behaviour() {
|
||||
// This all accesses private methods, so we can't actually run this
|
||||
// as doctests, because doctests run as an external crate.
|
||||
let mut small = super::MemoryStatus {
|
||||
non_reclaimable: 1024,
|
||||
};
|
||||
let mut large = super::MemoryStatus {
|
||||
non_reclaimable: 1024 * 1024 * 1024 * 1024,
|
||||
};
|
||||
|
||||
// objects are self-similar, no matter the size
|
||||
assert!(small.status_is_close_or_similar(&small));
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
// inequality is symmetric
|
||||
assert!(!small.status_is_close_or_similar(&large));
|
||||
assert!(!large.status_is_close_or_similar(&small));
|
||||
|
||||
small.non_reclaimable = 64;
|
||||
large.non_reclaimable = (small.non_reclaimable / 8) * 9;
|
||||
|
||||
// objects are self-similar, no matter the size
|
||||
assert!(small.status_is_close_or_similar(&small));
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
// values are similar if the larger value is larger by less than
|
||||
// 12.5%, i.e. 1/8 of the smaller value.
|
||||
// In the example above, large is exactly 12.5% larger, so this doesn't
|
||||
// match.
|
||||
assert!(!small.status_is_close_or_similar(&large));
|
||||
assert!(!large.status_is_close_or_similar(&small));
|
||||
|
||||
large.non_reclaimable -= 1;
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
assert!(small.status_is_close_or_similar(&large));
|
||||
assert!(large.status_is_close_or_similar(&small));
|
||||
|
||||
// The 1/8 rule only applies up to 128MiB of difference
|
||||
small.non_reclaimable = 1024 * 1024 * 1024 * 1024;
|
||||
large.non_reclaimable = small.non_reclaimable / 8 * 9;
|
||||
assert!(small.status_is_close_or_similar(&small));
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
assert!(!small.status_is_close_or_similar(&large));
|
||||
assert!(!large.status_is_close_or_similar(&small));
|
||||
// the large value is put just above the threshold
|
||||
large.non_reclaimable = small.non_reclaimable + 128 * 1024 * 1024;
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
assert!(!small.status_is_close_or_similar(&large));
|
||||
assert!(!large.status_is_close_or_similar(&small));
|
||||
// now below
|
||||
large.non_reclaimable -= 1;
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
assert!(small.status_is_close_or_similar(&large));
|
||||
assert!(large.status_is_close_or_similar(&small));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,11 +12,11 @@ use futures::{
|
||||
stream::{SplitSink, SplitStream},
|
||||
SinkExt, StreamExt,
|
||||
};
|
||||
use tracing::info;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use crate::protocol::{
|
||||
OutboundMsg, ProtocolRange, ProtocolResponse, ProtocolVersion, PROTOCOL_MAX_VERSION,
|
||||
PROTOCOL_MIN_VERSION,
|
||||
OutboundMsg, OutboundMsgKind, ProtocolRange, ProtocolResponse, ProtocolVersion,
|
||||
PROTOCOL_MAX_VERSION, PROTOCOL_MIN_VERSION,
|
||||
};
|
||||
|
||||
/// The central handler for all communications in the monitor.
|
||||
@@ -118,7 +118,12 @@ impl Dispatcher {
|
||||
/// serialize the wrong thing and send it, since `self.sink.send` will take
|
||||
/// any string.
|
||||
pub async fn send(&mut self, message: OutboundMsg) -> anyhow::Result<()> {
|
||||
info!(?message, "sending message");
|
||||
if matches!(&message.inner, OutboundMsgKind::HealthCheck { .. }) {
|
||||
debug!(?message, "sending message");
|
||||
} else {
|
||||
info!(?message, "sending message");
|
||||
}
|
||||
|
||||
let json = serde_json::to_string(&message).context("failed to serialize message")?;
|
||||
self.sink
|
||||
.send(Message::Text(json))
|
||||
|
||||
@@ -12,7 +12,7 @@ use axum::extract::ws::{Message, WebSocket};
|
||||
use futures::StreamExt;
|
||||
use tokio::sync::{broadcast, watch};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::cgroup::{self, CgroupWatcher};
|
||||
use crate::dispatcher::Dispatcher;
|
||||
@@ -474,26 +474,29 @@ impl Runner {
|
||||
// there is a message from the agent
|
||||
msg = self.dispatcher.source.next() => {
|
||||
if let Some(msg) = msg {
|
||||
// Don't use 'message' as a key as the string also uses
|
||||
// that for its key
|
||||
info!(?msg, "received message");
|
||||
match msg {
|
||||
match &msg {
|
||||
Ok(msg) => {
|
||||
let message: InboundMsg = match msg {
|
||||
Message::Text(text) => {
|
||||
serde_json::from_str(&text).context("failed to deserialize text message")?
|
||||
serde_json::from_str(text).context("failed to deserialize text message")?
|
||||
}
|
||||
other => {
|
||||
warn!(
|
||||
// Don't use 'message' as a key as the
|
||||
// string also uses that for its key
|
||||
msg = ?other,
|
||||
"agent should only send text messages but received different type"
|
||||
"problem processing incoming message: agent should only send text messages but received different type"
|
||||
);
|
||||
continue
|
||||
},
|
||||
};
|
||||
|
||||
if matches!(&message.inner, InboundMsgKind::HealthCheck { .. }) {
|
||||
debug!(?msg, "received message");
|
||||
} else {
|
||||
info!(?msg, "received message");
|
||||
}
|
||||
|
||||
let out = match self.process_message(message.clone()).await {
|
||||
Ok(Some(out)) => out,
|
||||
Ok(None) => continue,
|
||||
@@ -517,7 +520,11 @@ impl Runner {
|
||||
.await
|
||||
.context("failed to send message")?;
|
||||
}
|
||||
Err(e) => warn!("{e}"),
|
||||
Err(e) => warn!(
|
||||
error = format!("{e}"),
|
||||
msg = ?msg,
|
||||
"received error message"
|
||||
),
|
||||
}
|
||||
} else {
|
||||
anyhow::bail!("dispatcher connection closed")
|
||||
|
||||
@@ -37,6 +37,7 @@ humantime-serde.workspace = true
|
||||
hyper.workspace = true
|
||||
itertools.workspace = true
|
||||
leaky-bucket.workspace = true
|
||||
lz4_flex.workspace = true
|
||||
md5.workspace = true
|
||||
nix.workspace = true
|
||||
# hack to get the number of worker threads tokio uses
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
use std::num::NonZeroU32;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
@@ -8,7 +11,7 @@ use pageserver::task_mgr::TaskKind;
|
||||
use pageserver::tenant::block_io::BlockCursor;
|
||||
use pageserver::tenant::disk_btree::DiskBtreeReader;
|
||||
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
|
||||
use pageserver::tenant::storage_layer::{delta_layer, image_layer};
|
||||
use pageserver::tenant::storage_layer::{delta_layer, image_layer, LayerName};
|
||||
use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer};
|
||||
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
|
||||
use pageserver::{page_cache, virtual_file};
|
||||
@@ -20,7 +23,12 @@ use pageserver::{
|
||||
},
|
||||
virtual_file::VirtualFile,
|
||||
};
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use remote_storage::{GenericRemoteStorage, ListingMode, RemotePath, RemoteStorageConfig};
|
||||
use std::fs;
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
@@ -55,6 +63,17 @@ pub(crate) enum LayerCmd {
|
||||
#[clap(long)]
|
||||
new_timeline_id: Option<TimelineId>,
|
||||
},
|
||||
CompressOne {
|
||||
dest_path: Utf8PathBuf,
|
||||
layer_file_path: Utf8PathBuf,
|
||||
},
|
||||
CompressMany {
|
||||
tmp_dir: Utf8PathBuf,
|
||||
tenant_remote_prefix: String,
|
||||
tenant_remote_config: String,
|
||||
layers_dir: Utf8PathBuf,
|
||||
parallelism: Option<u32>,
|
||||
},
|
||||
}
|
||||
|
||||
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
|
||||
@@ -240,5 +259,138 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
|
||||
anyhow::bail!("not an image or delta layer: {layer_file_path}");
|
||||
}
|
||||
LayerCmd::CompressOne {
|
||||
dest_path,
|
||||
layer_file_path,
|
||||
} => {
|
||||
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
|
||||
let stats =
|
||||
ImageLayer::compression_statistics(dest_path, layer_file_path, &ctx).await?;
|
||||
println!(
|
||||
"Statistics: {stats:#?}\n{}",
|
||||
serde_json::to_string(&stats).unwrap()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
LayerCmd::CompressMany {
|
||||
tmp_dir,
|
||||
tenant_remote_prefix,
|
||||
tenant_remote_config,
|
||||
layers_dir,
|
||||
parallelism,
|
||||
} => {
|
||||
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let toml_document = toml_edit::Document::from_str(tenant_remote_config)?;
|
||||
let toml_item = toml_document
|
||||
.get("remote_storage")
|
||||
.expect("need remote_storage");
|
||||
let config = RemoteStorageConfig::from_toml(toml_item)?.expect("incomplete config");
|
||||
let storage = remote_storage::GenericRemoteStorage::from_config(&config)?;
|
||||
let storage = Arc::new(storage);
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
let path = RemotePath::from_string(tenant_remote_prefix)?;
|
||||
let max_files = NonZeroU32::new(128_000);
|
||||
let files_list = storage
|
||||
.list(Some(&path), ListingMode::NoDelimiter, max_files, &cancel)
|
||||
.await?;
|
||||
|
||||
println!("Listing gave {} keys", files_list.keys.len());
|
||||
|
||||
tokio::fs::create_dir_all(&layers_dir).await?;
|
||||
|
||||
let semaphore = Arc::new(Semaphore::new(parallelism.unwrap_or(1) as usize));
|
||||
|
||||
let mut tasks = JoinSet::new();
|
||||
for (file_idx, file_key) in files_list.keys.iter().enumerate() {
|
||||
let Some(file_name) = file_key.object_name() else {
|
||||
continue;
|
||||
};
|
||||
match LayerName::from_str(file_name) {
|
||||
Ok(LayerName::Delta(_)) => continue,
|
||||
Ok(LayerName::Image(_)) => (),
|
||||
Err(_e) => {
|
||||
// Split off the final part. We ensured above that this is not turning a
|
||||
// generation-less delta layer file name into an image layer file name.
|
||||
let Some(file_without_generation) = file_name.rsplit_once('-') else {
|
||||
continue;
|
||||
};
|
||||
let Ok(LayerName::Image(_layer_file_name)) =
|
||||
LayerName::from_str(file_without_generation.0)
|
||||
else {
|
||||
// Skipping because it's either not a layer or an image layer
|
||||
//println!("object {file_name}: not an image layer");
|
||||
continue;
|
||||
};
|
||||
}
|
||||
}
|
||||
let json_file_path = layers_dir.join(format!("{file_name}.json"));
|
||||
if tokio::fs::try_exists(&json_file_path).await? {
|
||||
//println!("object {file_name}: report already created");
|
||||
// If we have already created a report for the layer, skip it.
|
||||
continue;
|
||||
}
|
||||
let local_layer_path = layers_dir.join(file_name);
|
||||
async fn stats(
|
||||
semaphore: Arc<Semaphore>,
|
||||
local_layer_path: Utf8PathBuf,
|
||||
json_file_path: Utf8PathBuf,
|
||||
tmp_dir: Utf8PathBuf,
|
||||
storage: Arc<GenericRemoteStorage>,
|
||||
file_key: RemotePath,
|
||||
) -> Result<Vec<(Option<ImageCompressionAlgorithm>, u64, u64, u64)>, anyhow::Error>
|
||||
{
|
||||
let _permit = semaphore.acquire().await?;
|
||||
let cancel = CancellationToken::new();
|
||||
let download = storage.download(&file_key, &cancel).await?;
|
||||
let mut dest_layer_file = tokio::fs::File::create(&local_layer_path).await?;
|
||||
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
|
||||
let _size = tokio::io::copy_buf(&mut body, &mut dest_layer_file).await?;
|
||||
println!("Downloaded file to {local_layer_path}");
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
let stats =
|
||||
ImageLayer::compression_statistics(&tmp_dir, &local_layer_path, &ctx)
|
||||
.await?;
|
||||
|
||||
let stats_str = serde_json::to_string(&stats).unwrap();
|
||||
tokio::fs::write(json_file_path, stats_str).await?;
|
||||
|
||||
tokio::fs::remove_file(&local_layer_path).await?;
|
||||
Ok(stats)
|
||||
}
|
||||
let semaphore = semaphore.clone();
|
||||
let file_key = file_key.to_owned();
|
||||
let storage = storage.clone();
|
||||
let tmp_dir = tmp_dir.to_owned();
|
||||
let file_name = file_name.to_owned();
|
||||
let percent = (file_idx * 100) as f64 / files_list.keys.len() as f64;
|
||||
tasks.spawn(async move {
|
||||
let stats = stats(
|
||||
semaphore,
|
||||
local_layer_path.to_owned(),
|
||||
json_file_path.to_owned(),
|
||||
tmp_dir,
|
||||
storage,
|
||||
file_key,
|
||||
)
|
||||
.await;
|
||||
match stats {
|
||||
Ok(stats) => {
|
||||
println!("Statistics for {file_name} ({percent:.1}%): {stats:?}\n")
|
||||
}
|
||||
Err(e) => eprintln!("Error for {file_name}: {e:?}"),
|
||||
};
|
||||
});
|
||||
}
|
||||
while let Some(_res) = tasks.join_next().await {}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
//! See also `settings.md` for better description on every parameter.
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_api::{models::ImageCompressionAlgorithm, shard::TenantShardId};
|
||||
use remote_storage::{RemotePath, RemoteStorageConfig};
|
||||
use serde;
|
||||
use serde::de::IntoDeserializer;
|
||||
@@ -55,6 +55,7 @@ pub mod defaults {
|
||||
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
|
||||
DEFAULT_PG_LISTEN_PORT,
|
||||
};
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT;
|
||||
|
||||
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
|
||||
@@ -95,6 +96,8 @@ pub mod defaults {
|
||||
|
||||
pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB
|
||||
|
||||
pub const DEFAULT_IMAGE_COMPRESSION: Option<ImageCompressionAlgorithm> = None;
|
||||
|
||||
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;
|
||||
|
||||
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
|
||||
@@ -290,6 +293,8 @@ pub struct PageServerConf {
|
||||
|
||||
pub validate_vectored_get: bool,
|
||||
|
||||
pub image_compression: Option<ImageCompressionAlgorithm>,
|
||||
|
||||
/// How many bytes of ephemeral layer content will we allow per kilobyte of RAM. When this
|
||||
/// is exceeded, we start proactively closing ephemeral layers to limit the total amount
|
||||
/// of ephemeral data.
|
||||
@@ -400,6 +405,8 @@ struct PageServerConfigBuilder {
|
||||
|
||||
validate_vectored_get: BuilderValue<bool>,
|
||||
|
||||
image_compression: BuilderValue<Option<ImageCompressionAlgorithm>>,
|
||||
|
||||
ephemeral_bytes_per_memory_kb: BuilderValue<usize>,
|
||||
}
|
||||
|
||||
@@ -487,6 +494,7 @@ impl PageServerConfigBuilder {
|
||||
max_vectored_read_bytes: Set(MaxVectoredReadBytes(
|
||||
NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(),
|
||||
)),
|
||||
image_compression: Set(DEFAULT_IMAGE_COMPRESSION),
|
||||
validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
|
||||
ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
|
||||
}
|
||||
@@ -672,6 +680,10 @@ impl PageServerConfigBuilder {
|
||||
self.validate_vectored_get = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn get_image_compression(&mut self, value: Option<ImageCompressionAlgorithm>) {
|
||||
self.image_compression = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn get_ephemeral_bytes_per_memory_kb(&mut self, value: usize) {
|
||||
self.ephemeral_bytes_per_memory_kb = BuilderValue::Set(value);
|
||||
}
|
||||
@@ -732,6 +744,7 @@ impl PageServerConfigBuilder {
|
||||
get_impl,
|
||||
max_vectored_read_bytes,
|
||||
validate_vectored_get,
|
||||
image_compression,
|
||||
ephemeral_bytes_per_memory_kb,
|
||||
}
|
||||
CUSTOM LOGIC
|
||||
@@ -1026,6 +1039,9 @@ impl PageServerConf {
|
||||
"validate_vectored_get" => {
|
||||
builder.get_validate_vectored_get(parse_toml_bool("validate_vectored_get", item)?)
|
||||
}
|
||||
"image_compression" => {
|
||||
builder.get_image_compression(Some(parse_toml_from_str("image_compression", item)?))
|
||||
}
|
||||
"ephemeral_bytes_per_memory_kb" => {
|
||||
builder.get_ephemeral_bytes_per_memory_kb(parse_toml_u64("ephemeral_bytes_per_memory_kb", item)? as usize)
|
||||
}
|
||||
@@ -1110,6 +1126,7 @@ impl PageServerConf {
|
||||
NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
|
||||
.expect("Invalid default constant"),
|
||||
),
|
||||
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
|
||||
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
|
||||
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
|
||||
}
|
||||
@@ -1350,6 +1367,7 @@ background_task_maximum_delay = '334 s'
|
||||
.expect("Invalid default constant")
|
||||
),
|
||||
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
|
||||
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
|
||||
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
|
||||
},
|
||||
"Correct defaults should be used when no config values are provided"
|
||||
@@ -1423,6 +1441,7 @@ background_task_maximum_delay = '334 s'
|
||||
.expect("Invalid default constant")
|
||||
),
|
||||
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
|
||||
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
|
||||
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
|
||||
},
|
||||
"Should be able to parse all basic config values correctly"
|
||||
|
||||
@@ -11,7 +11,10 @@
|
||||
//! len < 128: 0XXXXXXX
|
||||
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
|
||||
//!
|
||||
use async_compression::Level;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
|
||||
|
||||
use crate::context::RequestContext;
|
||||
@@ -66,12 +69,29 @@ impl<'a> BlockCursor<'a> {
|
||||
len_buf.copy_from_slice(&buf[off..off + 4]);
|
||||
off += 4;
|
||||
}
|
||||
len_buf[0] &= 0x7f;
|
||||
len_buf[0] &= 0x0f;
|
||||
u32::from_be_bytes(len_buf) as usize
|
||||
};
|
||||
let compression_bits = first_len_byte & 0xf0;
|
||||
|
||||
dstbuf.clear();
|
||||
dstbuf.reserve(len);
|
||||
let mut tmp_buf = Vec::new();
|
||||
let buf_to_write;
|
||||
let compression = if compression_bits <= BYTE_UNCOMPRESSED {
|
||||
buf_to_write = dstbuf;
|
||||
None
|
||||
} else if compression_bits == BYTE_ZSTD || compression_bits == BYTE_LZ4 {
|
||||
buf_to_write = &mut tmp_buf;
|
||||
Some(dstbuf)
|
||||
} else {
|
||||
let error = std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("invalid compression byte {compression_bits:x}"),
|
||||
);
|
||||
return Err(error);
|
||||
};
|
||||
|
||||
buf_to_write.clear();
|
||||
buf_to_write.reserve(len);
|
||||
|
||||
// Read the payload
|
||||
let mut remain = len;
|
||||
@@ -85,14 +105,38 @@ impl<'a> BlockCursor<'a> {
|
||||
page_remain = PAGE_SZ;
|
||||
}
|
||||
let this_blk_len = min(remain, page_remain);
|
||||
dstbuf.extend_from_slice(&buf[off..off + this_blk_len]);
|
||||
buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]);
|
||||
remain -= this_blk_len;
|
||||
off += this_blk_len;
|
||||
}
|
||||
|
||||
if let Some(dstbuf) = compression {
|
||||
if compression_bits == BYTE_ZSTD {
|
||||
let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf);
|
||||
decoder.write_all(buf_to_write).await?;
|
||||
decoder.flush().await?;
|
||||
} else if compression_bits == BYTE_LZ4 {
|
||||
let decompressed = lz4_flex::block::decompress_size_prepended(&buf_to_write)
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("lz4 decompression error: {e:?}"),
|
||||
)
|
||||
})?;
|
||||
dstbuf.extend_from_slice(&decompressed);
|
||||
} else {
|
||||
unreachable!("already checked above")
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
const BYTE_UNCOMPRESSED: u8 = 0x80;
|
||||
const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
|
||||
const BYTE_LZ4: u8 = BYTE_UNCOMPRESSED | 0x20;
|
||||
|
||||
/// A wrapper of `VirtualFile` that allows users to write blobs.
|
||||
///
|
||||
/// If a `BlobWriter` is dropped, the internal buffer will be
|
||||
@@ -219,6 +263,17 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
&mut self,
|
||||
srcbuf: B,
|
||||
ctx: &RequestContext,
|
||||
) -> (B::Buf, Result<u64, Error>) {
|
||||
self.write_blob_compressed(srcbuf, ctx, None).await
|
||||
}
|
||||
|
||||
/// Write a blob of data. Returns the offset that it was written to,
|
||||
/// which can be used to retrieve the data later.
|
||||
pub async fn write_blob_compressed<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
srcbuf: B,
|
||||
ctx: &RequestContext,
|
||||
algorithm: Option<ImageCompressionAlgorithm>,
|
||||
) -> (B::Buf, Result<u64, Error>) {
|
||||
let offset = self.offset;
|
||||
|
||||
@@ -226,29 +281,75 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
|
||||
let mut io_buf = self.io_buf.take().expect("we always put it back below");
|
||||
io_buf.clear();
|
||||
let (io_buf, hdr_res) = async {
|
||||
let mut compressed_buf = None;
|
||||
let ((io_buf, hdr_res), srcbuf) = async {
|
||||
if len < 128 {
|
||||
// Short blob. Write a 1-byte length header
|
||||
io_buf.put_u8(len as u8);
|
||||
self.write_all(io_buf, ctx).await
|
||||
(
|
||||
self.write_all(io_buf, ctx).await,
|
||||
srcbuf.slice(..).into_inner(),
|
||||
)
|
||||
} else {
|
||||
// Write a 4-byte length header
|
||||
if len > 0x7fff_ffff {
|
||||
if len > 0x0fff_ffff {
|
||||
return (
|
||||
io_buf,
|
||||
Err(Error::new(
|
||||
ErrorKind::Other,
|
||||
format!("blob too large ({len} bytes)"),
|
||||
)),
|
||||
(
|
||||
io_buf,
|
||||
Err(Error::new(
|
||||
ErrorKind::Other,
|
||||
format!("blob too large ({len} bytes)"),
|
||||
)),
|
||||
),
|
||||
srcbuf.slice(..).into_inner(),
|
||||
);
|
||||
}
|
||||
if len > 0x0fff_ffff {
|
||||
tracing::warn!("writing blob above future limit ({len} bytes)");
|
||||
}
|
||||
let mut len_buf = (len as u32).to_be_bytes();
|
||||
len_buf[0] |= 0x80;
|
||||
use ImageCompressionAlgorithm::*;
|
||||
let (high_bit_mask, len_written, srcbuf) = match algorithm {
|
||||
Some(ZstdLow | Zstd | ZstdHigh) => {
|
||||
let mut encoder = if matches!(algorithm, Some(ZstdLow)) {
|
||||
async_compression::tokio::write::ZstdEncoder::with_quality(
|
||||
Vec::new(),
|
||||
Level::Precise(1),
|
||||
)
|
||||
} else if matches!(algorithm, Some(ZstdHigh)) {
|
||||
async_compression::tokio::write::ZstdEncoder::with_quality(
|
||||
Vec::new(),
|
||||
Level::Precise(6),
|
||||
)
|
||||
} else {
|
||||
async_compression::tokio::write::ZstdEncoder::new(Vec::new())
|
||||
};
|
||||
let slice = srcbuf.slice(..);
|
||||
encoder.write_all(&slice[..]).await.unwrap();
|
||||
encoder.shutdown().await.unwrap();
|
||||
let compressed = encoder.into_inner();
|
||||
if compressed.len() < len {
|
||||
let compressed_len = compressed.len();
|
||||
compressed_buf = Some(compressed);
|
||||
(BYTE_ZSTD, compressed_len, slice.into_inner())
|
||||
} else {
|
||||
(BYTE_UNCOMPRESSED, len, slice.into_inner())
|
||||
}
|
||||
}
|
||||
Some(ImageCompressionAlgorithm::LZ4) => {
|
||||
let slice = srcbuf.slice(..);
|
||||
let compressed = lz4_flex::block::compress_prepend_size(&slice[..]);
|
||||
if compressed.len() < len {
|
||||
let compressed_len = compressed.len();
|
||||
compressed_buf = Some(compressed);
|
||||
(BYTE_LZ4, compressed_len, slice.into_inner())
|
||||
} else {
|
||||
(BYTE_UNCOMPRESSED, len, slice.into_inner())
|
||||
}
|
||||
}
|
||||
None => (BYTE_UNCOMPRESSED, len, srcbuf.slice(..).into_inner()),
|
||||
};
|
||||
let mut len_buf = (len_written as u32).to_be_bytes();
|
||||
assert_eq!(len_buf[0] & 0xf0, 0);
|
||||
len_buf[0] |= high_bit_mask;
|
||||
io_buf.extend_from_slice(&len_buf[..]);
|
||||
self.write_all(io_buf, ctx).await
|
||||
(self.write_all(io_buf, ctx).await, srcbuf)
|
||||
}
|
||||
}
|
||||
.await;
|
||||
@@ -257,7 +358,12 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
Ok(_) => (),
|
||||
Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
|
||||
}
|
||||
let (srcbuf, res) = self.write_all(srcbuf, ctx).await;
|
||||
let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
|
||||
let (_buf, res) = self.write_all(compressed_buf, ctx).await;
|
||||
(Slice::into_inner(srcbuf.slice(..)), res)
|
||||
} else {
|
||||
self.write_all(srcbuf, ctx).await
|
||||
};
|
||||
(srcbuf, res.map(|_| offset))
|
||||
}
|
||||
}
|
||||
@@ -295,6 +401,12 @@ mod tests {
|
||||
use rand::{Rng, SeedableRng};
|
||||
|
||||
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
|
||||
round_trip_test_compressed::<BUFFERED, 0>(blobs).await
|
||||
}
|
||||
|
||||
async fn round_trip_test_compressed<const BUFFERED: bool, const COMPRESSION: u8>(
|
||||
blobs: &[Vec<u8>],
|
||||
) -> Result<(), Error> {
|
||||
let temp_dir = camino_tempfile::tempdir()?;
|
||||
let pathbuf = temp_dir.path().join("file");
|
||||
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
|
||||
@@ -305,7 +417,26 @@ mod tests {
|
||||
let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?;
|
||||
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
|
||||
for blob in blobs.iter() {
|
||||
let (_, res) = wtr.write_blob(blob.clone(), &ctx).await;
|
||||
let (_, res) = match COMPRESSION {
|
||||
0 => wtr.write_blob(blob.clone(), &ctx).await,
|
||||
1 => {
|
||||
wtr.write_blob_compressed(
|
||||
blob.clone(),
|
||||
&ctx,
|
||||
Some(ImageCompressionAlgorithm::ZstdLow),
|
||||
)
|
||||
.await
|
||||
}
|
||||
2 => {
|
||||
wtr.write_blob_compressed(
|
||||
blob.clone(),
|
||||
&ctx,
|
||||
Some(ImageCompressionAlgorithm::LZ4),
|
||||
)
|
||||
.await
|
||||
}
|
||||
_ => unreachable!("Invalid compression {COMPRESSION}"),
|
||||
};
|
||||
let offs = res?;
|
||||
offsets.push(offs);
|
||||
}
|
||||
@@ -361,10 +492,17 @@ mod tests {
|
||||
let blobs = &[
|
||||
b"test".to_vec(),
|
||||
random_array(10 * PAGE_SZ),
|
||||
b"hello".to_vec(),
|
||||
random_array(66 * PAGE_SZ),
|
||||
vec![0xf3; 24 * PAGE_SZ],
|
||||
b"foobar".to_vec(),
|
||||
];
|
||||
round_trip_test::<false>(blobs).await?;
|
||||
round_trip_test::<true>(blobs).await?;
|
||||
round_trip_test_compressed::<false, 1>(blobs).await?;
|
||||
round_trip_test_compressed::<true, 1>(blobs).await?;
|
||||
round_trip_test_compressed::<false, 2>(blobs).await?;
|
||||
round_trip_test_compressed::<true, 2>(blobs).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -318,7 +318,7 @@ pub(crate) struct LayerFringe {
|
||||
#[derive(Debug)]
|
||||
struct LayerKeyspace {
|
||||
layer: ReadableLayer,
|
||||
target_keyspace: Vec<KeySpace>,
|
||||
target_keyspace: KeySpaceRandomAccum,
|
||||
}
|
||||
|
||||
impl LayerFringe {
|
||||
@@ -342,17 +342,13 @@ impl LayerFringe {
|
||||
_,
|
||||
LayerKeyspace {
|
||||
layer,
|
||||
target_keyspace,
|
||||
mut target_keyspace,
|
||||
},
|
||||
)) => {
|
||||
let mut keyspace = KeySpaceRandomAccum::new();
|
||||
for ks in target_keyspace {
|
||||
for part in ks.ranges {
|
||||
keyspace.add_range(part);
|
||||
}
|
||||
}
|
||||
Some((layer, keyspace.consume_keyspace(), read_desc.lsn_range))
|
||||
}
|
||||
)) => Some((
|
||||
layer,
|
||||
target_keyspace.consume_keyspace(),
|
||||
read_desc.lsn_range,
|
||||
)),
|
||||
None => unreachable!("fringe internals are always consistent"),
|
||||
}
|
||||
}
|
||||
@@ -367,16 +363,18 @@ impl LayerFringe {
|
||||
let entry = self.layers.entry(layer_id.clone());
|
||||
match entry {
|
||||
Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().target_keyspace.push(keyspace);
|
||||
entry.get_mut().target_keyspace.add_keyspace(keyspace);
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
self.planned_reads_by_lsn.push(ReadDesc {
|
||||
lsn_range,
|
||||
layer_id: layer_id.clone(),
|
||||
});
|
||||
let mut accum = KeySpaceRandomAccum::new();
|
||||
accum.add_keyspace(keyspace);
|
||||
entry.insert(LayerKeyspace {
|
||||
layer,
|
||||
target_keyspace: vec![keyspace],
|
||||
target_keyspace: accum,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,17 +46,19 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hex;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::models::{ImageCompressionAlgorithm, LayerAccessKind};
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::File;
|
||||
use std::io::SeekFrom;
|
||||
use std::ops::Range;
|
||||
use std::os::unix::fs::MetadataExt;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio::time::Instant;
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::*;
|
||||
|
||||
@@ -366,6 +368,170 @@ impl ImageLayer {
|
||||
res?;
|
||||
Ok(())
|
||||
}
|
||||
pub async fn compression_statistics(
|
||||
dest_repo_path: &Utf8Path,
|
||||
path: &Utf8Path,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<(Option<ImageCompressionAlgorithm>, u64, u64, u64)>> {
|
||||
fn make_conf(
|
||||
image_compression: Option<ImageCompressionAlgorithm>,
|
||||
dest_repo_path: &Utf8Path,
|
||||
) -> &'static PageServerConf {
|
||||
let mut conf = PageServerConf::dummy_conf(dest_repo_path.to_owned());
|
||||
conf.image_compression = image_compression;
|
||||
Box::leak(Box::new(conf))
|
||||
}
|
||||
let image_compressions = [
|
||||
None,
|
||||
Some(ImageCompressionAlgorithm::ZstdLow),
|
||||
Some(ImageCompressionAlgorithm::Zstd),
|
||||
Some(ImageCompressionAlgorithm::ZstdHigh),
|
||||
Some(ImageCompressionAlgorithm::LZ4),
|
||||
];
|
||||
let confs = image_compressions
|
||||
.clone()
|
||||
.map(|compression| make_conf(compression, dest_repo_path));
|
||||
let mut stats = Vec::new();
|
||||
for (image_compression, conf) in image_compressions.into_iter().zip(confs) {
|
||||
let start_compression = Instant::now();
|
||||
let compressed_path = Self::compress_for_conf(path, ctx, conf).await?;
|
||||
let path_to_delete = compressed_path.clone();
|
||||
scopeguard::defer!({
|
||||
let _ = std::fs::remove_file(path_to_delete);
|
||||
});
|
||||
let size = path.metadata()?.size();
|
||||
let elapsed_ms = start_compression.elapsed().as_millis() as u64;
|
||||
let start_decompression = Instant::now();
|
||||
Self::compare_are_equal(path, &compressed_path, ctx, &image_compression).await?;
|
||||
let elapsed_decompression_ms = start_decompression.elapsed().as_millis() as u64;
|
||||
stats.push((
|
||||
image_compression,
|
||||
size,
|
||||
elapsed_ms,
|
||||
elapsed_decompression_ms,
|
||||
));
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
Ok(stats)
|
||||
}
|
||||
|
||||
async fn compress_for_conf(
|
||||
path: &Utf8Path,
|
||||
ctx: &RequestContext,
|
||||
conf: &'static PageServerConf,
|
||||
) -> anyhow::Result<Utf8PathBuf> {
|
||||
let file =
|
||||
VirtualFile::open_with_options(path, virtual_file::OpenOptions::new().read(true), ctx)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
|
||||
let file_id = page_cache::next_file_id();
|
||||
let block_reader = FileBlockReader::new(&file, file_id);
|
||||
let summary_blk = block_reader.read_blk(0, ctx).await?;
|
||||
let summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
|
||||
if summary.magic != IMAGE_FILE_MAGIC {
|
||||
anyhow::bail!("magic file mismatch");
|
||||
}
|
||||
|
||||
let tree_reader = DiskBtreeReader::new(
|
||||
summary.index_start_blk,
|
||||
summary.index_root_blk,
|
||||
&block_reader,
|
||||
);
|
||||
|
||||
let mut key_offset_stream =
|
||||
std::pin::pin!(tree_reader.get_stream_from(&[0u8; KEY_SIZE], ctx));
|
||||
|
||||
let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
|
||||
let timeline_path = conf.timeline_path(&tenant_shard_id, &summary.timeline_id);
|
||||
tokio::fs::create_dir_all(timeline_path).await?;
|
||||
|
||||
let mut writer = ImageLayerWriter::new(
|
||||
conf,
|
||||
summary.timeline_id,
|
||||
tenant_shard_id,
|
||||
&summary.key_range,
|
||||
summary.lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let cursor = block_reader.block_cursor();
|
||||
while let Some(r) = key_offset_stream.next().await {
|
||||
let (key, offset) = r?;
|
||||
let key = Key::from_slice(&key);
|
||||
let content = cursor.read_blob(offset, ctx).await?;
|
||||
writer.put_image(key, content.into(), ctx).await?;
|
||||
}
|
||||
let path = writer.inner.take().unwrap().finish_inner(ctx).await?.2;
|
||||
Ok(path)
|
||||
}
|
||||
|
||||
async fn compare_are_equal(
|
||||
path_a: &Utf8Path,
|
||||
path_b: &Utf8Path,
|
||||
ctx: &RequestContext,
|
||||
cmp: &Option<ImageCompressionAlgorithm>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut files = Vec::new();
|
||||
for path in [path_a, path_b] {
|
||||
let file = VirtualFile::open_with_options(
|
||||
path,
|
||||
virtual_file::OpenOptions::new().read(true),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
files.push(file);
|
||||
}
|
||||
|
||||
let mut readers_summaries = Vec::new();
|
||||
for file in files.iter() {
|
||||
let file_id = page_cache::next_file_id();
|
||||
let block_reader = FileBlockReader::new(&file, file_id);
|
||||
let summary_blk = block_reader.read_blk(0, ctx).await?;
|
||||
let summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
|
||||
if summary.magic != IMAGE_FILE_MAGIC {
|
||||
anyhow::bail!("magic file mismatch");
|
||||
}
|
||||
readers_summaries.push((block_reader, summary));
|
||||
}
|
||||
|
||||
let mut tree_readers_cursors = Vec::new();
|
||||
for (block_reader, summary) in readers_summaries.iter() {
|
||||
let tree_reader = DiskBtreeReader::new(
|
||||
summary.index_start_blk,
|
||||
summary.index_root_blk,
|
||||
block_reader,
|
||||
);
|
||||
let cursor = block_reader.block_cursor();
|
||||
tree_readers_cursors.push((tree_reader, cursor));
|
||||
}
|
||||
|
||||
let mut key_offset_stream_a = std::pin::pin!(tree_readers_cursors[0]
|
||||
.0
|
||||
.get_stream_from(&[0u8; KEY_SIZE], ctx));
|
||||
let mut key_offset_stream_b = std::pin::pin!(tree_readers_cursors[1]
|
||||
.0
|
||||
.get_stream_from(&[0u8; KEY_SIZE], ctx));
|
||||
while let Some(r) = key_offset_stream_a.next().await {
|
||||
let (key_a, offset_a): (Vec<u8>, _) = r?;
|
||||
let Some(r) = key_offset_stream_b.next().await else {
|
||||
panic!("second file at {path_b} has fewer keys than {path_a}");
|
||||
};
|
||||
let (key_b, offset_b): (Vec<u8>, _) = r?;
|
||||
assert_eq!(key_a, key_b, "mismatch of keys for {path_a}:{path_b}");
|
||||
let key = Key::from_slice(&key_a);
|
||||
let content_a = tree_readers_cursors[0].1.read_blob(offset_a, ctx).await?;
|
||||
let content_b = tree_readers_cursors[1].1.read_blob(offset_b, ctx).await?;
|
||||
assert_eq!(
|
||||
content_a, content_b,
|
||||
"mismatch for key={key} cmp={cmp:?} and {path_a}:{path_b}"
|
||||
);
|
||||
//println!("match for key={key} cmp={cmp:?} from {path_a}");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ImageLayerInner {
|
||||
@@ -782,7 +948,10 @@ impl ImageLayerWriterInner {
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
let (_img, res) = self.blob_writer.write_blob(img, ctx).await;
|
||||
let (_img, res) = self
|
||||
.blob_writer
|
||||
.write_blob_compressed(img, ctx, self.conf.image_compression)
|
||||
.await;
|
||||
// TODO: re-use the buffer for `img` further upstack
|
||||
let off = res?;
|
||||
|
||||
@@ -796,11 +965,10 @@ impl ImageLayerWriterInner {
|
||||
///
|
||||
/// Finish writing the image layer.
|
||||
///
|
||||
async fn finish(
|
||||
async fn finish_inner(
|
||||
self,
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ResidentLayer> {
|
||||
) -> anyhow::Result<(&'static PageServerConf, PersistentLayerDesc, Utf8PathBuf)> {
|
||||
let index_start_blk =
|
||||
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
|
||||
|
||||
@@ -854,8 +1022,16 @@ impl ImageLayerWriterInner {
|
||||
// fsync the file
|
||||
file.sync_all().await?;
|
||||
|
||||
Ok((self.conf, desc, self.path))
|
||||
}
|
||||
async fn finish(
|
||||
self,
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ResidentLayer> {
|
||||
let (conf, desc, path) = self.finish_inner(ctx).await?;
|
||||
// FIXME: why not carry the virtualfile here, it supports renaming?
|
||||
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
|
||||
let layer = Layer::finish_creating(conf, timeline, desc, &path)?;
|
||||
|
||||
info!("created image layer {}", layer.local_path());
|
||||
|
||||
@@ -923,6 +1099,12 @@ impl ImageLayerWriter {
|
||||
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
|
||||
}
|
||||
|
||||
/// Obtains the current size of the file
|
||||
pub(crate) fn size(&self) -> u64 {
|
||||
let inner = self.inner.as_ref().unwrap();
|
||||
inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
|
||||
}
|
||||
|
||||
///
|
||||
/// Finish writing the image layer.
|
||||
///
|
||||
|
||||
63
scripts/plot-compression-report.py
Executable file
63
scripts/plot-compression-report.py
Executable file
@@ -0,0 +1,63 @@
|
||||
#!/usr/bin/env -S python3 -u
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
from pprint import pprint
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
parser = argparse.ArgumentParser(prog="compression-report")
|
||||
parser.add_argument("dir")
|
||||
args = parser.parse_args()
|
||||
|
||||
files = []
|
||||
for file_name in os.listdir(args.dir):
|
||||
if not file_name.endswith(".json"):
|
||||
continue
|
||||
file_path = os.path.join(args.dir, file_name)
|
||||
with open(file_path) as json_str:
|
||||
json_data = json.load(json_str)
|
||||
files.append((file_name, json_data))
|
||||
#pprint(files)
|
||||
|
||||
extra_zstd_lines = True
|
||||
dc = 2 # data column to use (1 for sizes, 2 for time)
|
||||
sort_by = "ZstdHigh"
|
||||
files.sort(key=lambda file_data: [x for x in file_data[1] if x[0] == sort_by][0][dc])
|
||||
|
||||
|
||||
x_axis = []
|
||||
data_baseline = []
|
||||
data_lz4 = []
|
||||
data_zstd = []
|
||||
data_zstd_low = []
|
||||
data_zstd_high = []
|
||||
|
||||
for idx, f in enumerate(files):
|
||||
file_data = f[1]
|
||||
#pprint(file_data)
|
||||
|
||||
x_axis.append(idx)
|
||||
data_baseline.append([x for x in file_data if x[0] is None][0][dc])
|
||||
data_lz4.append([x for x in file_data if x[0] == "LZ4"][0][dc])
|
||||
data_zstd.append([x for x in file_data if x[0] == "Zstd"][0][dc])
|
||||
if extra_zstd_lines:
|
||||
data_zstd_low.append([x for x in file_data if x[0] == "ZstdLow"][0][dc])
|
||||
data_zstd_high.append([x for x in file_data if x[0] == "ZstdHigh"][0][dc])
|
||||
|
||||
plt.plot(x_axis, data_baseline, "x", markeredgewidth=2, label="baseline")
|
||||
plt.plot(x_axis, data_lz4, "x", markeredgewidth=2, label="lz4")
|
||||
plt.plot(x_axis, data_zstd, "x", markeredgewidth=2, label="Zstd")
|
||||
if extra_zstd_lines:
|
||||
plt.plot(x_axis, data_zstd_low, "x", markeredgewidth=2, label="ZstdLow")
|
||||
plt.plot(x_axis, data_zstd_high, "x", markeredgewidth=2, label="ZstdHigh")
|
||||
|
||||
# plt.style.use('_mpl-gallery')
|
||||
plt.ylim(bottom=0)
|
||||
plt.legend(loc="upper left")
|
||||
|
||||
figure_path = os.path.join(args.dir, "figure.png")
|
||||
print(f"saving figure to {figure_path}")
|
||||
plt.savefig(figure_path)
|
||||
plt.show()
|
||||
@@ -31,6 +31,7 @@ pub(crate) enum PageserverState {
|
||||
Available {
|
||||
last_seen_at: Instant,
|
||||
utilization: PageserverUtilization,
|
||||
new: bool,
|
||||
},
|
||||
Offline,
|
||||
}
|
||||
@@ -127,6 +128,7 @@ impl HeartbeaterTask {
|
||||
heartbeat_futs.push({
|
||||
let jwt_token = self.jwt_token.clone();
|
||||
let cancel = self.cancel.clone();
|
||||
let new_node = !self.state.contains_key(node_id);
|
||||
|
||||
// Clone the node and mark it as available such that the request
|
||||
// goes through to the pageserver even when the node is marked offline.
|
||||
@@ -159,6 +161,7 @@ impl HeartbeaterTask {
|
||||
PageserverState::Available {
|
||||
last_seen_at: Instant::now(),
|
||||
utilization,
|
||||
new: new_node,
|
||||
}
|
||||
} else {
|
||||
PageserverState::Offline
|
||||
@@ -220,6 +223,7 @@ impl HeartbeaterTask {
|
||||
}
|
||||
},
|
||||
Vacant(_) => {
|
||||
// This is a new node. Don't generate a delta for it.
|
||||
deltas.push((node_id, ps_state.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::{str::FromStr, time::Duration};
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy,
|
||||
TenantLocateResponseShard,
|
||||
TenantLocateResponseShard, UtilizationScore,
|
||||
},
|
||||
shard::TenantShardId,
|
||||
};
|
||||
@@ -116,6 +116,16 @@ impl Node {
|
||||
match (self.availability, availability) {
|
||||
(Offline, Active(_)) => ToActive,
|
||||
(Active(_), Offline) => ToOffline,
|
||||
// Consider the case when the storage controller handles the re-attach of a node
|
||||
// before the heartbeats detect that the node is back online. We still need
|
||||
// [`Service::node_configure`] to attempt reconciliations for shards with an
|
||||
// unknown observed location.
|
||||
// The unsavoury match arm below handles this situation.
|
||||
(Active(lhs), Active(rhs))
|
||||
if lhs == UtilizationScore::worst() && rhs < UtilizationScore::worst() =>
|
||||
{
|
||||
ToActive
|
||||
}
|
||||
_ => Unchanged,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ use crate::{
|
||||
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, WrappedWriteGuard},
|
||||
persistence::{AbortShardSplitStatus, TenantFilter},
|
||||
reconciler::{ReconcileError, ReconcileUnits},
|
||||
scheduler::{ScheduleContext, ScheduleMode},
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
|
||||
tenant_shard::{
|
||||
MigrateAttachment, ReconcileNeeded, ScheduleOptimization, ScheduleOptimizationAction,
|
||||
},
|
||||
@@ -747,29 +747,61 @@ impl Service {
|
||||
let res = self.heartbeater.heartbeat(nodes).await;
|
||||
if let Ok(deltas) = res {
|
||||
for (node_id, state) in deltas.0 {
|
||||
let new_availability = match state {
|
||||
PageserverState::Available { utilization, .. } => NodeAvailability::Active(
|
||||
UtilizationScore(utilization.utilization_score),
|
||||
let (new_node, new_availability) = match state {
|
||||
PageserverState::Available {
|
||||
utilization, new, ..
|
||||
} => (
|
||||
new,
|
||||
NodeAvailability::Active(UtilizationScore(
|
||||
utilization.utilization_score,
|
||||
)),
|
||||
),
|
||||
PageserverState::Offline => NodeAvailability::Offline,
|
||||
PageserverState::Offline => (false, NodeAvailability::Offline),
|
||||
};
|
||||
let res = self
|
||||
.node_configure(node_id, Some(new_availability), None)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(()) => {}
|
||||
Err(ApiError::NotFound(_)) => {
|
||||
// This should be rare, but legitimate since the heartbeats are done
|
||||
// on a snapshot of the nodes.
|
||||
tracing::info!("Node {} was not found after heartbeat round", node_id);
|
||||
if new_node {
|
||||
// When the heartbeats detect a newly added node, we don't wish
|
||||
// to attempt to reconcile the shards assigned to it. The node
|
||||
// is likely handling it's re-attach response, so reconciling now
|
||||
// would be counterproductive.
|
||||
//
|
||||
// Instead, update the in-memory state with the details learned about the
|
||||
// node.
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, _tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let mut new_nodes = (**nodes).clone();
|
||||
|
||||
if let Some(node) = new_nodes.get_mut(&node_id) {
|
||||
node.set_availability(new_availability);
|
||||
scheduler.node_upsert(node);
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!(
|
||||
"Failed to update node {} after heartbeat round: {}",
|
||||
node_id,
|
||||
err
|
||||
);
|
||||
|
||||
locked.nodes = Arc::new(new_nodes);
|
||||
} else {
|
||||
// This is the code path for geniune availability transitions (i.e node
|
||||
// goes unavailable and/or comes back online).
|
||||
let res = self
|
||||
.node_configure(node_id, Some(new_availability), None)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(()) => {}
|
||||
Err(ApiError::NotFound(_)) => {
|
||||
// This should be rare, but legitimate since the heartbeats are done
|
||||
// on a snapshot of the nodes.
|
||||
tracing::info!(
|
||||
"Node {} was not found after heartbeat round",
|
||||
node_id
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!(
|
||||
"Failed to update node {} after heartbeat round: {}",
|
||||
node_id,
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4316,6 +4348,16 @@ impl Service {
|
||||
continue;
|
||||
}
|
||||
|
||||
if !new_nodes
|
||||
.values()
|
||||
.any(|n| matches!(n.may_schedule(), MaySchedule::Yes(_)))
|
||||
{
|
||||
// Special case for when all nodes are unavailable and/or unschedulable: there is no point
|
||||
// trying to reschedule since there's nowhere else to go. Without this
|
||||
// branch we incorrectly detach tenants in response to node unavailability.
|
||||
continue;
|
||||
}
|
||||
|
||||
if tenant_shard.intent.demote_attached(scheduler, node_id) {
|
||||
tenant_shard.sequence = tenant_shard.sequence.next();
|
||||
|
||||
@@ -4353,6 +4395,12 @@ impl Service {
|
||||
// When a node comes back online, we must reconcile any tenant that has a None observed
|
||||
// location on the node.
|
||||
for tenant_shard in locked.tenants.values_mut() {
|
||||
// If a reconciliation is already in progress, rely on the previous scheduling
|
||||
// decision and skip triggering a new reconciliation.
|
||||
if tenant_shard.reconciler.is_some() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) {
|
||||
if observed_loc.conf.is_none() {
|
||||
self.maybe_reconcile_shard(tenant_shard, &new_nodes);
|
||||
|
||||
15
test_runner/performance/pgvector/halfvec_build.sql
Normal file
15
test_runner/performance/pgvector/halfvec_build.sql
Normal file
@@ -0,0 +1,15 @@
|
||||
DROP TABLE IF EXISTS halfvec_test_table;
|
||||
|
||||
CREATE TABLE halfvec_test_table (
|
||||
_id text NOT NULL,
|
||||
title text,
|
||||
text text,
|
||||
embeddings halfvec(1536),
|
||||
PRIMARY KEY (_id)
|
||||
);
|
||||
|
||||
INSERT INTO halfvec_test_table (_id, title, text, embeddings)
|
||||
SELECT _id, title, text, embeddings::halfvec
|
||||
FROM documents;
|
||||
|
||||
CREATE INDEX documents_half_precision_hnsw_idx ON halfvec_test_table USING hnsw (embeddings halfvec_cosine_ops) WITH (m = 64, ef_construction = 128);
|
||||
@@ -0,0 +1,13 @@
|
||||
-- run with pooled connection
|
||||
-- pgbench -T 300 -c 100 -j20 -f pgbench_halfvec_queries.sql -postgresql://neondb_owner:<secret>@ep-floral-thunder-w1gzhaxi-pooler.eu-west-1.aws.neon.build/neondb?sslmode=require"
|
||||
|
||||
with x (x) as (
|
||||
select "embeddings" as x
|
||||
from halfvec_test_table
|
||||
TABLESAMPLE SYSTEM (1)
|
||||
LIMIT 1
|
||||
)
|
||||
SELECT title, "embeddings" <=> (select x from x) as distance
|
||||
FROM halfvec_test_table
|
||||
ORDER BY 2
|
||||
LIMIT 30;
|
||||
@@ -1,13 +0,0 @@
|
||||
-- run with pooled connection
|
||||
-- pgbench -T 300 -c 100 -j20 -f pgbench_hnsw_queries.sql -postgresql://neondb_owner:<secret>@ep-floral-thunder-w1gzhaxi-pooler.eu-west-1.aws.neon.build/neondb?sslmode=require"
|
||||
|
||||
with x (x) as (
|
||||
select "embeddings" as x
|
||||
from hnsw_test_table
|
||||
TABLESAMPLE SYSTEM (1)
|
||||
LIMIT 1
|
||||
)
|
||||
SELECT title, "embeddings" <=> (select x from x) as distance
|
||||
FROM hnsw_test_table
|
||||
ORDER BY 2
|
||||
LIMIT 30;
|
||||
@@ -106,6 +106,7 @@ QUERIES: Tuple[LabelledQuery, ...] = (
|
||||
# Disable auto formatting for the list of queries so that it's easier to read
|
||||
# fmt: off
|
||||
PGVECTOR_QUERIES: Tuple[LabelledQuery, ...] = (
|
||||
LabelledQuery("PGVPREP", r"ALTER EXTENSION VECTOR UPDATE;"),
|
||||
LabelledQuery("PGV0", r"DROP TABLE IF EXISTS hnsw_test_table;"),
|
||||
LabelledQuery("PGV1", r"CREATE TABLE hnsw_test_table AS TABLE documents WITH NO DATA;"),
|
||||
LabelledQuery("PGV2", r"INSERT INTO hnsw_test_table SELECT * FROM documents;"),
|
||||
@@ -115,6 +116,10 @@ PGVECTOR_QUERIES: Tuple[LabelledQuery, ...] = (
|
||||
LabelledQuery("PGV6", r"CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_l1_ops);"),
|
||||
LabelledQuery("PGV7", r"CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_hamming_ops);"),
|
||||
LabelledQuery("PGV8", r"CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_jaccard_ops);"),
|
||||
LabelledQuery("PGV9", r"DROP TABLE IF EXISTS halfvec_test_table;"),
|
||||
LabelledQuery("PGV10", r"CREATE TABLE halfvec_test_table (_id text NOT NULL, title text, text text, embeddings halfvec(1536), PRIMARY KEY (_id));"),
|
||||
LabelledQuery("PGV11", r"INSERT INTO halfvec_test_table (_id, title, text, embeddings) SELECT _id, title, text, embeddings::halfvec FROM documents;"),
|
||||
LabelledQuery("PGV12", r"CREATE INDEX documents_half_precision_hnsw_idx ON halfvec_test_table USING hnsw (embeddings halfvec_cosine_ops) WITH (m = 64, ef_construction = 128);"),
|
||||
)
|
||||
# fmt: on
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ class PgBenchLoadType(enum.Enum):
|
||||
SIMPLE_UPDATE = "simple-update"
|
||||
SELECT_ONLY = "select-only"
|
||||
PGVECTOR_HNSW = "pgvector-hnsw"
|
||||
PGVECTOR_HALFVEC = "pgvector-halfvec"
|
||||
|
||||
|
||||
def utc_now_timestamp() -> int:
|
||||
@@ -153,6 +154,26 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
|
||||
password=password,
|
||||
)
|
||||
|
||||
if workload_type == PgBenchLoadType.PGVECTOR_HALFVEC:
|
||||
# Run simple-update workload
|
||||
run_pgbench(
|
||||
env,
|
||||
"pgvector-halfvec",
|
||||
[
|
||||
"pgbench",
|
||||
"-f",
|
||||
"test_runner/performance/pgvector/pgbench_custom_script_pgvector_halfvec_queries.sql",
|
||||
"-c100",
|
||||
"-j20",
|
||||
f"-T{duration}",
|
||||
"-P2",
|
||||
"--protocol=prepared",
|
||||
"--progress-timestamp",
|
||||
connstr,
|
||||
],
|
||||
password=password,
|
||||
)
|
||||
|
||||
env.report_size()
|
||||
|
||||
|
||||
@@ -222,13 +243,3 @@ def test_pgbench_remote_simple_update(remote_compare: PgCompare, scale: int, dur
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgbench_remote_select_only(remote_compare: PgCompare, scale: int, duration: int):
|
||||
run_test_pgbench(remote_compare, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
||||
|
||||
|
||||
# The following test runs on an existing database that has pgvector extension installed
|
||||
# and a table with 1 million embedding vectors loaded and indexed with HNSW.
|
||||
#
|
||||
# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup.
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgbench_remote_pgvector(remote_compare: PgCompare, duration: int):
|
||||
run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HNSW)
|
||||
|
||||
24
test_runner/performance/test_perf_pgvector_queries.py
Normal file
24
test_runner/performance/test_perf_pgvector_queries.py
Normal file
@@ -0,0 +1,24 @@
|
||||
import pytest
|
||||
from fixtures.compare_fixtures import PgCompare
|
||||
|
||||
from performance.test_perf_pgbench import PgBenchLoadType, get_durations_matrix, run_test_pgbench
|
||||
|
||||
|
||||
# The following test runs on an existing database that has pgvector extension installed
|
||||
# and a table with 1 million embedding vectors loaded and indexed with HNSW.
|
||||
#
|
||||
# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup.
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgbench_remote_pgvector_hnsw(remote_compare: PgCompare, duration: int):
|
||||
run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HNSW)
|
||||
|
||||
|
||||
# The following test runs on an existing database that has pgvector extension installed
|
||||
# and a table with 1 million embedding vectors loaded and indexed with halfvec.
|
||||
#
|
||||
# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup.
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgbench_remote_pgvector_halfvec(remote_compare: PgCompare, duration: int):
|
||||
run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HALFVEC)
|
||||
@@ -934,19 +934,27 @@ class Failure:
|
||||
def clear(self, env: NeonEnv):
|
||||
raise NotImplementedError()
|
||||
|
||||
def nodes(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class NodeStop(Failure):
|
||||
def __init__(self, pageserver_id, immediate):
|
||||
self.pageserver_id = pageserver_id
|
||||
def __init__(self, pageserver_ids, immediate):
|
||||
self.pageserver_ids = pageserver_ids
|
||||
self.immediate = immediate
|
||||
|
||||
def apply(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.stop(immediate=self.immediate)
|
||||
for ps_id in self.pageserver_ids:
|
||||
pageserver = env.get_pageserver(ps_id)
|
||||
pageserver.stop(immediate=self.immediate)
|
||||
|
||||
def clear(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.start()
|
||||
for ps_id in self.pageserver_ids:
|
||||
pageserver = env.get_pageserver(ps_id)
|
||||
pageserver.start()
|
||||
|
||||
def nodes(self):
|
||||
return self.pageserver_ids
|
||||
|
||||
|
||||
class PageserverFailpoint(Failure):
|
||||
@@ -962,6 +970,9 @@ class PageserverFailpoint(Failure):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.http_client().configure_failpoints((self.failpoint, "off"))
|
||||
|
||||
def nodes(self):
|
||||
return [self.pageserver_id]
|
||||
|
||||
|
||||
def build_node_to_tenants_map(env: NeonEnv) -> dict[int, list[TenantId]]:
|
||||
tenants = env.storage_controller.tenant_list()
|
||||
@@ -985,8 +996,9 @@ def build_node_to_tenants_map(env: NeonEnv) -> dict[int, list[TenantId]]:
|
||||
@pytest.mark.parametrize(
|
||||
"failure",
|
||||
[
|
||||
NodeStop(pageserver_id=1, immediate=False),
|
||||
NodeStop(pageserver_id=1, immediate=True),
|
||||
NodeStop(pageserver_ids=[1], immediate=False),
|
||||
NodeStop(pageserver_ids=[1], immediate=True),
|
||||
NodeStop(pageserver_ids=[1, 2], immediate=True),
|
||||
PageserverFailpoint(pageserver_id=1, failpoint="get-utilization-http-handler"),
|
||||
],
|
||||
)
|
||||
@@ -1039,33 +1051,50 @@ def test_storage_controller_heartbeats(
|
||||
wait_until(10, 1, tenants_placed)
|
||||
|
||||
# ... then we apply the failure
|
||||
offline_node_id = failure.pageserver_id
|
||||
online_node_id = (set(range(1, len(env.pageservers) + 1)) - {offline_node_id}).pop()
|
||||
env.get_pageserver(offline_node_id).allowed_errors.append(
|
||||
# In the case of the failpoint failure, the impacted pageserver
|
||||
# still believes it has the tenant attached since location
|
||||
# config calls into it will fail due to being marked offline.
|
||||
".*Dropped remote consistent LSN updates.*",
|
||||
)
|
||||
offline_node_ids = set(failure.nodes())
|
||||
online_node_ids = set(range(1, len(env.pageservers) + 1)) - offline_node_ids
|
||||
|
||||
for node_id in offline_node_ids:
|
||||
env.get_pageserver(node_id).allowed_errors.append(
|
||||
# In the case of the failpoint failure, the impacted pageserver
|
||||
# still believes it has the tenant attached since location
|
||||
# config calls into it will fail due to being marked offline.
|
||||
".*Dropped remote consistent LSN updates.*",
|
||||
)
|
||||
|
||||
if len(offline_node_ids) > 1:
|
||||
env.get_pageserver(node_id).allowed_errors.append(
|
||||
".*Scheduling error when marking pageserver.*offline.*",
|
||||
)
|
||||
|
||||
failure.apply(env)
|
||||
|
||||
# ... expecting the heartbeats to mark it offline
|
||||
def node_offline():
|
||||
def nodes_offline():
|
||||
nodes = env.storage_controller.node_list()
|
||||
log.info(f"{nodes=}")
|
||||
target = next(n for n in nodes if n["id"] == offline_node_id)
|
||||
assert target["availability"] == "Offline"
|
||||
for node in nodes:
|
||||
if node["id"] in offline_node_ids:
|
||||
assert node["availability"] == "Offline"
|
||||
|
||||
# A node is considered offline if the last successful heartbeat
|
||||
# was more than 10 seconds ago (hardcoded in the storage controller).
|
||||
wait_until(20, 1, node_offline)
|
||||
wait_until(20, 1, nodes_offline)
|
||||
|
||||
# .. expecting the tenant on the offline node to be migrated
|
||||
def tenant_migrated():
|
||||
if len(online_node_ids) == 0:
|
||||
time.sleep(5)
|
||||
return
|
||||
|
||||
node_to_tenants = build_node_to_tenants_map(env)
|
||||
log.info(f"{node_to_tenants=}")
|
||||
assert set(node_to_tenants[online_node_id]) == set(tenant_ids)
|
||||
|
||||
observed_tenants = set()
|
||||
for node_id in online_node_ids:
|
||||
observed_tenants |= set(node_to_tenants[node_id])
|
||||
|
||||
assert observed_tenants == set(tenant_ids)
|
||||
|
||||
wait_until(10, 1, tenant_migrated)
|
||||
|
||||
@@ -1073,31 +1102,24 @@ def test_storage_controller_heartbeats(
|
||||
failure.clear(env)
|
||||
|
||||
# ... expecting the offline node to become active again
|
||||
def node_online():
|
||||
def nodes_online():
|
||||
nodes = env.storage_controller.node_list()
|
||||
target = next(n for n in nodes if n["id"] == offline_node_id)
|
||||
assert target["availability"] == "Active"
|
||||
for node in nodes:
|
||||
if node["id"] in online_node_ids:
|
||||
assert node["availability"] == "Active"
|
||||
|
||||
wait_until(10, 1, node_online)
|
||||
wait_until(10, 1, nodes_online)
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
# ... then we create a new tenant
|
||||
tid = TenantId.generate()
|
||||
env.storage_controller.tenant_create(tid)
|
||||
|
||||
# ... expecting it to be placed on the node that just came back online
|
||||
tenants = env.storage_controller.tenant_list()
|
||||
newest_tenant = next(t for t in tenants if t["tenant_shard_id"] == str(tid))
|
||||
locations = list(newest_tenant["observed"]["locations"].keys())
|
||||
locations = [int(node_id) for node_id in locations]
|
||||
assert locations == [offline_node_id]
|
||||
node_to_tenants = build_node_to_tenants_map(env)
|
||||
log.info(f"Back online: {node_to_tenants=}")
|
||||
|
||||
# ... expecting the storage controller to reach a consistent state
|
||||
def storage_controller_consistent():
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
wait_until(10, 1, storage_controller_consistent)
|
||||
wait_until(30, 1, storage_controller_consistent)
|
||||
|
||||
|
||||
def test_storage_controller_re_attach(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
@@ -324,14 +324,15 @@ files:
|
||||
help: 'Whether or not the replication slot wal_status is lost'
|
||||
key_labels:
|
||||
- slot_name
|
||||
values: [wal_status_is_lost]
|
||||
values: [wal_is_lost]
|
||||
query: |
|
||||
SELECT slot_name,
|
||||
CASE
|
||||
WHEN wal_status = 'lost' THEN 1
|
||||
ELSE 0
|
||||
END AS wal_status_is_lost
|
||||
END AS wal_is_lost
|
||||
FROM pg_replication_slots;
|
||||
|
||||
- filename: neon_collector_autoscaling.yml
|
||||
content: |
|
||||
collector_name: neon_collector_autoscaling
|
||||
|
||||
Reference in New Issue
Block a user