mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 05:00:38 +00:00
Compare commits
1 Commits
reenable_l
...
jcsp/tests
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d1deef0d6b |
55
Cargo.lock
generated
55
Cargo.lock
generated
@@ -206,16 +206,6 @@ dependencies = [
|
||||
"syn 2.0.90",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "assert-json-diff"
|
||||
version = "2.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12"
|
||||
dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-channel"
|
||||
version = "1.9.0"
|
||||
@@ -1020,12 +1010,6 @@ dependencies = [
|
||||
"generic-array",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "boxcar"
|
||||
version = "0.2.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2721c3c5a6f0e7f7e607125d963fedeb765f545f67adc9d71ed934693881eb42"
|
||||
|
||||
[[package]]
|
||||
name = "bstr"
|
||||
version = "1.5.0"
|
||||
@@ -2449,16 +2433,6 @@ dependencies = [
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "gettid"
|
||||
version = "0.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "397256552fed4a9e577850498071831ec8f18ea83368aecc114cab469dcb43e5"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "gimli"
|
||||
version = "0.31.1"
|
||||
@@ -4238,16 +4212,6 @@ dependencies = [
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "papaya"
|
||||
version = "0.1.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dc7c76487f7eaa00a0fc1d7f88dc6b295aec478d11b0fc79f857b62c2874124c"
|
||||
dependencies = [
|
||||
"equivalent",
|
||||
"seize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parking"
|
||||
version = "2.1.1"
|
||||
@@ -4875,7 +4839,6 @@ dependencies = [
|
||||
"ahash",
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
"assert-json-diff",
|
||||
"async-compression",
|
||||
"async-trait",
|
||||
"atomic-take",
|
||||
@@ -4883,7 +4846,6 @@ dependencies = [
|
||||
"aws-sdk-iam",
|
||||
"aws-sigv4",
|
||||
"base64 0.13.1",
|
||||
"boxcar",
|
||||
"bstr",
|
||||
"bytes",
|
||||
"camino",
|
||||
@@ -4900,7 +4862,6 @@ dependencies = [
|
||||
"flate2",
|
||||
"framed-websockets",
|
||||
"futures",
|
||||
"gettid",
|
||||
"hashbrown 0.14.5",
|
||||
"hashlink",
|
||||
"hex",
|
||||
@@ -4923,9 +4884,7 @@ dependencies = [
|
||||
"measured",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"opentelemetry",
|
||||
"p256 0.13.2",
|
||||
"papaya",
|
||||
"parking_lot 0.12.1",
|
||||
"parquet",
|
||||
"parquet_derive",
|
||||
@@ -4972,9 +4931,6 @@ dependencies = [
|
||||
"tokio-tungstenite 0.21.0",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-log",
|
||||
"tracing-opentelemetry",
|
||||
"tracing-serde",
|
||||
"tracing-subscriber",
|
||||
"tracing-utils",
|
||||
"try-lock",
|
||||
@@ -5928,16 +5884,6 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "seize"
|
||||
version = "0.4.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d84b0c858bdd30cb56f5597f8b3bf702ec23829e652cc636a1e5a7b9de46ae93"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "semver"
|
||||
version = "1.0.17"
|
||||
@@ -8199,7 +8145,6 @@ dependencies = [
|
||||
"tower 0.4.13",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-log",
|
||||
"url",
|
||||
"zerocopy",
|
||||
"zeroize",
|
||||
|
||||
@@ -54,7 +54,6 @@ async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
|
||||
atomic-take = "1.1.0"
|
||||
backtrace = "0.3.74"
|
||||
flate2 = "1.0.26"
|
||||
assert-json-diff = "2"
|
||||
async-stream = "0.3"
|
||||
async-trait = "0.1"
|
||||
aws-config = { version = "1.5", default-features = false, features=["rustls", "sso"] }
|
||||
@@ -194,9 +193,7 @@ tower-http = { version = "0.6.2", features = ["request-id", "trace"] }
|
||||
tower-service = "0.3.3"
|
||||
tracing = "0.1"
|
||||
tracing-error = "0.2"
|
||||
tracing-log = "0.2"
|
||||
tracing-opentelemetry = "0.28"
|
||||
tracing-serde = "0.2.0"
|
||||
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
|
||||
try-lock = "0.2.5"
|
||||
twox-hash = { version = "1.6.3", default-features = false }
|
||||
|
||||
@@ -3,13 +3,8 @@ ARG DEBIAN_VERSION=bookworm
|
||||
FROM debian:bookworm-slim AS pgcopydb_builder
|
||||
ARG DEBIAN_VERSION
|
||||
|
||||
# Use strict mode for bash to catch errors early
|
||||
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
|
||||
|
||||
# By default, /bin/sh used in debian images will treat '\n' as eol,
|
||||
# but as we use bash as SHELL, and built-in echo in bash requires '-e' flag for that.
|
||||
RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
|
||||
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc && \
|
||||
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc \
|
||||
echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc
|
||||
|
||||
RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
|
||||
@@ -60,8 +55,7 @@ ARG DEBIAN_VERSION
|
||||
|
||||
# Add nonroot user
|
||||
RUN useradd -ms /bin/bash nonroot -b /home
|
||||
# Use strict mode for bash to catch errors early
|
||||
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
|
||||
SHELL ["/bin/bash", "-c"]
|
||||
|
||||
RUN mkdir -p /pgcopydb/bin && \
|
||||
mkdir -p /pgcopydb/lib && \
|
||||
@@ -72,7 +66,7 @@ COPY --from=pgcopydb_builder /usr/lib/postgresql/16/bin/pgcopydb /pgcopydb/bin/p
|
||||
COPY --from=pgcopydb_builder /pgcopydb/lib/libpq.so.5 /pgcopydb/lib/libpq.so.5
|
||||
|
||||
RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
|
||||
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc && \
|
||||
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc \
|
||||
echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc
|
||||
|
||||
# System deps
|
||||
@@ -196,14 +190,8 @@ RUN set -e \
|
||||
# It includes several bug fixes on top on v2.0 release (https://github.com/linux-test-project/lcov/compare/v2.0...master)
|
||||
# And patches from us:
|
||||
# - Generates json file with code coverage summary (https://github.com/neondatabase/lcov/commit/426e7e7a22f669da54278e9b55e6d8caabd00af0.tar.gz)
|
||||
RUN set +o pipefail && \
|
||||
for package in Capture::Tiny DateTime Devel::Cover Digest::MD5 File::Spec JSON::XS Memory::Process Time::HiRes JSON; do \
|
||||
yes | perl -MCPAN -e "CPAN::Shell->notest('install', '$package')";\
|
||||
done && \
|
||||
set -o pipefail
|
||||
# Split into separate step to debug flaky failures here
|
||||
RUN wget https://github.com/neondatabase/lcov/archive/426e7e7a22f669da54278e9b55e6d8caabd00af0.tar.gz -O lcov.tar.gz \
|
||||
&& ls -laht lcov.tar.gz && sha256sum lcov.tar.gz \
|
||||
RUN for package in Capture::Tiny DateTime Devel::Cover Digest::MD5 File::Spec JSON::XS Memory::Process Time::HiRes JSON; do yes | perl -MCPAN -e "CPAN::Shell->notest('install', '$package')"; done \
|
||||
&& wget https://github.com/neondatabase/lcov/archive/426e7e7a22f669da54278e9b55e6d8caabd00af0.tar.gz -O lcov.tar.gz \
|
||||
&& echo "61a22a62e20908b8b9e27d890bd0ea31f567a7b9668065589266371dcbca0992 lcov.tar.gz" | sha256sum --check \
|
||||
&& mkdir -p lcov && tar -xzf lcov.tar.gz -C lcov --strip-components=1 \
|
||||
&& cd lcov \
|
||||
|
||||
@@ -85,10 +85,6 @@ ARG DEBIAN_VERSION=bookworm
|
||||
ARG DEBIAN_FLAVOR=${DEBIAN_VERSION}-slim
|
||||
ARG ALPINE_CURL_VERSION=8.11.1
|
||||
|
||||
# By default, build all PostgreSQL extensions. For quick local testing when you don't
|
||||
# care about the extensions, pass EXTENSIONS=none or EXTENSIONS=minimal
|
||||
ARG EXTENSIONS=all
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "build-deps"
|
||||
@@ -100,10 +96,8 @@ ARG DEBIAN_VERSION
|
||||
# Use strict mode for bash to catch errors early
|
||||
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
|
||||
|
||||
# By default, /bin/sh used in debian images will treat '\n' as eol,
|
||||
# but as we use bash as SHELL, and built-in echo in bash requires '-e' flag for that.
|
||||
RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
|
||||
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc && \
|
||||
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc \
|
||||
echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc
|
||||
|
||||
RUN case $DEBIAN_VERSION in \
|
||||
@@ -1074,7 +1068,6 @@ ENV PATH="/home/nonroot/.cargo/bin:$PATH"
|
||||
USER nonroot
|
||||
WORKDIR /home/nonroot
|
||||
|
||||
# See comment on the top of the file regading `echo` and `\n`
|
||||
RUN echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /home/nonroot/.curlrc
|
||||
|
||||
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && \
|
||||
@@ -1488,35 +1481,12 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "extensions-none"
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS extensions-none
|
||||
|
||||
RUN mkdir /usr/local/pgsql
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "extensions-minimal"
|
||||
#
|
||||
# This subset of extensions includes the extensions that we have in
|
||||
# shared_preload_libraries by default.
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS extensions-minimal
|
||||
|
||||
COPY --from=pgrag-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=timescaledb-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_cron-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "extensions-all"
|
||||
# Layer "all-extensions"
|
||||
# Bundle together all the extensions
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS extensions-all
|
||||
FROM build-deps AS all-extensions
|
||||
ARG PG_VERSION
|
||||
|
||||
# Public extensions
|
||||
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
@@ -1558,13 +1528,7 @@ COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_mooncake-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_repack-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "neon-pg-ext-build"
|
||||
# Includes Postgres and all the extensions chosen by EXTENSIONS arg.
|
||||
#
|
||||
#########################################################################################
|
||||
FROM extensions-${EXTENSIONS} AS neon-pg-ext-build
|
||||
COPY --from=neon-ext-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
@@ -1620,7 +1584,6 @@ FROM alpine/curl:${ALPINE_CURL_VERSION} AS exporters
|
||||
ARG TARGETARCH
|
||||
# Keep sql_exporter version same as in build-tools.Dockerfile and
|
||||
# test_runner/regress/test_compute_metrics.py
|
||||
# See comment on the top of the file regading `echo`, `-e` and `\n`
|
||||
RUN echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc; \
|
||||
if [ "$TARGETARCH" = "amd64" ]; then\
|
||||
postgres_exporter_sha256='027e75dda7af621237ff8f5ac66b78a40b0093595f06768612b92b1374bd3105';\
|
||||
@@ -1647,8 +1610,7 @@ RUN echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 30
|
||||
#
|
||||
#########################################################################################
|
||||
FROM neon-ext-build AS postgres-cleanup-layer
|
||||
|
||||
COPY --from=neon-pg-ext-build /usr/local/pgsql /usr/local/pgsql
|
||||
COPY --from=all-extensions /usr/local/pgsql /usr/local/pgsql
|
||||
|
||||
# Remove binaries from /bin/ that we won't use (or would manually copy & install otherwise)
|
||||
RUN cd /usr/local/pgsql/bin && rm -f ecpg raster2pgsql shp2pgsql pgtopo_export pgtopo_import pgsql2shp
|
||||
@@ -1738,10 +1700,6 @@ ENV PGDATABASE=postgres
|
||||
#########################################################################################
|
||||
FROM debian:$DEBIAN_FLAVOR
|
||||
ARG DEBIAN_VERSION
|
||||
|
||||
# Use strict mode for bash to catch errors early
|
||||
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
|
||||
|
||||
# Add user postgres
|
||||
RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
|
||||
echo "postgres:test_console_pass" | chpasswd && \
|
||||
|
||||
@@ -32,7 +32,6 @@ reason = "the marvin attack only affects private key decryption, not public key
|
||||
# https://embarkstudios.github.io/cargo-deny/checks/licenses/cfg.html
|
||||
[licenses]
|
||||
allow = [
|
||||
"0BSD",
|
||||
"Apache-2.0",
|
||||
"BSD-2-Clause",
|
||||
"BSD-3-Clause",
|
||||
|
||||
@@ -52,7 +52,6 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
|
||||
|
||||
if [ $pg_version -ge 16 ]; then
|
||||
docker cp ext-src $TEST_CONTAINER_NAME:/
|
||||
docker exec $TEST_CONTAINER_NAME bash -c "apt update && apt install -y libtap-parser-sourcehandler-pgtap-perl"
|
||||
# This is required for the pg_hint_plan test, to prevent flaky log message causing the test to fail
|
||||
# It cannot be moved to Dockerfile now because the database directory is created after the start of the container
|
||||
echo Adding dummy config
|
||||
|
||||
@@ -1,4 +0,0 @@
|
||||
#!/bin/bash
|
||||
set -ex
|
||||
cd "$(dirname "${0}")"
|
||||
pg_prove test.sql
|
||||
@@ -1,15 +0,0 @@
|
||||
diff --git a/test.sql b/test.sql
|
||||
index d7a0ca8..f15bc76 100644
|
||||
--- a/test.sql
|
||||
+++ b/test.sql
|
||||
@@ -9,9 +9,7 @@
|
||||
\set ON_ERROR_STOP true
|
||||
\set QUIET 1
|
||||
|
||||
-CREATE EXTENSION pgcrypto;
|
||||
-CREATE EXTENSION pgtap;
|
||||
-CREATE EXTENSION pgjwt;
|
||||
+CREATE EXTENSION IF NOT EXISTS pgtap;
|
||||
|
||||
BEGIN;
|
||||
SELECT plan(23);
|
||||
@@ -1,5 +0,0 @@
|
||||
#!/bin/sh
|
||||
set -ex
|
||||
cd "$(dirname ${0})"
|
||||
patch -p1 <test-upgrade.patch
|
||||
pg_prove test.sql
|
||||
@@ -24,7 +24,7 @@ function wait_for_ready {
|
||||
}
|
||||
function create_extensions() {
|
||||
for ext in ${1}; do
|
||||
docker compose exec neon-test-extensions psql -X -v ON_ERROR_STOP=1 -d contrib_regression -c "CREATE EXTENSION IF NOT EXISTS ${ext} CASCADE"
|
||||
docker compose exec neon-test-extensions psql -X -v ON_ERROR_STOP=1 -d contrib_regression -c "CREATE EXTENSION IF NOT EXISTS ${ext}"
|
||||
done
|
||||
}
|
||||
EXTENSIONS='[
|
||||
@@ -40,8 +40,7 @@ EXTENSIONS='[
|
||||
{"extname": "pg_uuidv7", "extdir": "pg_uuidv7-src"},
|
||||
{"extname": "roaringbitmap", "extdir": "pg_roaringbitmap-src"},
|
||||
{"extname": "semver", "extdir": "pg_semver-src"},
|
||||
{"extname": "pg_ivm", "extdir": "pg_ivm-src"},
|
||||
{"extname": "pgjwt", "extdir": "pgjwt-src"}
|
||||
{"extname": "pg_ivm", "extdir": "pg_ivm-src"}
|
||||
]'
|
||||
EXTNAMES=$(echo ${EXTENSIONS} | jq -r '.[].extname' | paste -sd ' ' -)
|
||||
TAG=${NEWTAG} docker compose --profile test-extensions up --quiet-pull --build -d
|
||||
|
||||
@@ -204,16 +204,14 @@ impl RemoteExtSpec {
|
||||
|
||||
// Check if extension is present in public or custom.
|
||||
// If not, then it is not allowed to be used by this compute.
|
||||
if !self
|
||||
.public_extensions
|
||||
.as_ref()
|
||||
.is_some_and(|exts| exts.iter().any(|e| e == ext_name))
|
||||
&& !self
|
||||
.custom_extensions
|
||||
.as_ref()
|
||||
.is_some_and(|exts| exts.iter().any(|e| e == ext_name))
|
||||
{
|
||||
return Err(anyhow::anyhow!("extension {} is not found", real_ext_name));
|
||||
if let Some(public_extensions) = &self.public_extensions {
|
||||
if !public_extensions.contains(&real_ext_name.to_string()) {
|
||||
if let Some(custom_extensions) = &self.custom_extensions {
|
||||
if !custom_extensions.contains(&real_ext_name.to_string()) {
|
||||
return Err(anyhow::anyhow!("extension {} is not found", real_ext_name));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match self.extension_data.get(real_ext_name) {
|
||||
@@ -342,96 +340,6 @@ mod tests {
|
||||
use super::*;
|
||||
use std::fs::File;
|
||||
|
||||
#[test]
|
||||
fn allow_installing_remote_extensions() {
|
||||
let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
|
||||
"public_extensions": null,
|
||||
"custom_extensions": null,
|
||||
"library_index": {},
|
||||
"extension_data": {},
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
rspec
|
||||
.get_ext("ext", false, "latest", "v17")
|
||||
.expect_err("Extension should not be found");
|
||||
|
||||
let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
|
||||
"public_extensions": [],
|
||||
"custom_extensions": null,
|
||||
"library_index": {},
|
||||
"extension_data": {},
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
rspec
|
||||
.get_ext("ext", false, "latest", "v17")
|
||||
.expect_err("Extension should not be found");
|
||||
|
||||
let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
|
||||
"public_extensions": [],
|
||||
"custom_extensions": [],
|
||||
"library_index": {
|
||||
"ext": "ext"
|
||||
},
|
||||
"extension_data": {
|
||||
"ext": {
|
||||
"control_data": {
|
||||
"ext.control": ""
|
||||
},
|
||||
"archive_path": ""
|
||||
}
|
||||
},
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
rspec
|
||||
.get_ext("ext", false, "latest", "v17")
|
||||
.expect_err("Extension should not be found");
|
||||
|
||||
let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
|
||||
"public_extensions": [],
|
||||
"custom_extensions": ["ext"],
|
||||
"library_index": {
|
||||
"ext": "ext"
|
||||
},
|
||||
"extension_data": {
|
||||
"ext": {
|
||||
"control_data": {
|
||||
"ext.control": ""
|
||||
},
|
||||
"archive_path": ""
|
||||
}
|
||||
},
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
rspec
|
||||
.get_ext("ext", false, "latest", "v17")
|
||||
.expect("Extension should be found");
|
||||
|
||||
let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
|
||||
"public_extensions": ["ext"],
|
||||
"custom_extensions": [],
|
||||
"library_index": {
|
||||
"ext": "ext"
|
||||
},
|
||||
"extension_data": {
|
||||
"ext": {
|
||||
"control_data": {
|
||||
"ext.control": ""
|
||||
},
|
||||
"archive_path": ""
|
||||
}
|
||||
},
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
rspec
|
||||
.get_ext("ext", false, "latest", "v17")
|
||||
.expect("Extension should be found");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_spec_file() {
|
||||
let file = File::open("tests/cluster_spec.json").unwrap();
|
||||
|
||||
@@ -32,7 +32,6 @@ use utils::id::TimelineId;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{PageContentKind, RequestContext};
|
||||
use crate::pgdatadir_mapping::DatadirModificationStats;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::layer_map::LayerMap;
|
||||
use crate::tenant::mgr::TenantSlot;
|
||||
@@ -2379,40 +2378,11 @@ pub(crate) struct WalIngestMetrics {
|
||||
pub(crate) records_observed: IntCounter,
|
||||
pub(crate) records_committed: IntCounter,
|
||||
pub(crate) records_filtered: IntCounter,
|
||||
pub(crate) values_committed_metadata_images: IntCounter,
|
||||
pub(crate) values_committed_metadata_deltas: IntCounter,
|
||||
pub(crate) values_committed_data_images: IntCounter,
|
||||
pub(crate) values_committed_data_deltas: IntCounter,
|
||||
pub(crate) gap_blocks_zeroed_on_rel_extend: IntCounter,
|
||||
}
|
||||
|
||||
impl WalIngestMetrics {
|
||||
pub(crate) fn inc_values_committed(&self, stats: &DatadirModificationStats) {
|
||||
if stats.metadata_images > 0 {
|
||||
self.values_committed_metadata_images
|
||||
.inc_by(stats.metadata_images);
|
||||
}
|
||||
if stats.metadata_deltas > 0 {
|
||||
self.values_committed_metadata_deltas
|
||||
.inc_by(stats.metadata_deltas);
|
||||
}
|
||||
if stats.data_images > 0 {
|
||||
self.values_committed_data_images.inc_by(stats.data_images);
|
||||
}
|
||||
if stats.data_deltas > 0 {
|
||||
self.values_committed_data_deltas.inc_by(stats.data_deltas);
|
||||
}
|
||||
}
|
||||
pub(crate) clear_vm_bits_unknown: IntCounterVec,
|
||||
}
|
||||
|
||||
pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| {
|
||||
let values_committed = register_int_counter_vec!(
|
||||
"pageserver_wal_ingest_values_committed",
|
||||
"Number of values committed to pageserver storage from WAL records",
|
||||
&["class", "kind"],
|
||||
)
|
||||
.expect("failed to define a metric");
|
||||
|
||||
WalIngestMetrics {
|
||||
bytes_received: register_int_counter!(
|
||||
"pageserver_wal_ingest_bytes_received",
|
||||
@@ -2439,15 +2409,17 @@ pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| {
|
||||
"Number of WAL records filtered out due to sharding"
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
values_committed_metadata_images: values_committed.with_label_values(&["metadata", "image"]),
|
||||
values_committed_metadata_deltas: values_committed.with_label_values(&["metadata", "delta"]),
|
||||
values_committed_data_images: values_committed.with_label_values(&["data", "image"]),
|
||||
values_committed_data_deltas: values_committed.with_label_values(&["data", "delta"]),
|
||||
gap_blocks_zeroed_on_rel_extend: register_int_counter!(
|
||||
"pageserver_gap_blocks_zeroed_on_rel_extend",
|
||||
"Total number of zero gap blocks written on relation extends"
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
clear_vm_bits_unknown: register_int_counter_vec!(
|
||||
"pageserver_wal_ingest_clear_vm_bits_unknown",
|
||||
"Number of ignored ClearVmBits operations due to unknown pages/relations",
|
||||
&["entity"],
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -48,7 +48,7 @@ use tracing::{debug, trace, warn};
|
||||
use utils::bin_ser::DeserializeError;
|
||||
use utils::pausable_failpoint;
|
||||
use utils::{bin_ser::BeSer, lsn::Lsn};
|
||||
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
|
||||
use wal_decoder::serialized_batch::SerializedValueBatch;
|
||||
|
||||
/// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
|
||||
pub const MAX_AUX_FILE_DELTAS: usize = 1024;
|
||||
@@ -1297,26 +1297,6 @@ impl DatadirModification<'_> {
|
||||
.is_some_and(|b| b.has_data())
|
||||
}
|
||||
|
||||
/// Returns statistics about the currently pending modifications.
|
||||
pub(crate) fn stats(&self) -> DatadirModificationStats {
|
||||
let mut stats = DatadirModificationStats::default();
|
||||
for (_, _, value) in self.pending_metadata_pages.values().flatten() {
|
||||
match value {
|
||||
Value::Image(_) => stats.metadata_images += 1,
|
||||
Value::WalRecord(r) if r.will_init() => stats.metadata_images += 1,
|
||||
Value::WalRecord(_) => stats.metadata_deltas += 1,
|
||||
}
|
||||
}
|
||||
for valuemeta in self.pending_data_batch.iter().flat_map(|b| &b.metadata) {
|
||||
match valuemeta {
|
||||
ValueMeta::Serialized(s) if s.will_init => stats.data_images += 1,
|
||||
ValueMeta::Serialized(_) => stats.data_deltas += 1,
|
||||
ValueMeta::Observed(_) => {}
|
||||
}
|
||||
}
|
||||
stats
|
||||
}
|
||||
|
||||
/// Set the current lsn
|
||||
pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
|
||||
ensure!(
|
||||
@@ -2337,15 +2317,6 @@ impl DatadirModification<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Statistics for a DatadirModification.
|
||||
#[derive(Default)]
|
||||
pub struct DatadirModificationStats {
|
||||
pub metadata_images: u64,
|
||||
pub metadata_deltas: u64,
|
||||
pub data_images: u64,
|
||||
pub data_deltas: u64,
|
||||
}
|
||||
|
||||
/// This struct facilitates accessing either a committed key from the timeline at a
|
||||
/// specific LSN, or the latest uncommitted key from a pending modification.
|
||||
///
|
||||
|
||||
@@ -9,14 +9,13 @@ use crate::{
|
||||
metrics::SECONDARY_MODE,
|
||||
tenant::{
|
||||
config::AttachmentMode,
|
||||
mgr::{GetTenantError, TenantManager},
|
||||
mgr::GetTenantError,
|
||||
mgr::TenantManager,
|
||||
remote_timeline_client::remote_heatmap_path,
|
||||
span::debug_assert_current_span_has_tenant_id,
|
||||
tasks::{warn_when_period_overrun, BackgroundLoopKind},
|
||||
Tenant,
|
||||
},
|
||||
virtual_file::VirtualFile,
|
||||
TEMP_FILE_SUFFIX,
|
||||
};
|
||||
|
||||
use futures::Future;
|
||||
@@ -33,10 +32,7 @@ use super::{
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info_span, instrument, Instrument};
|
||||
use utils::{
|
||||
backoff, completion::Barrier, crashsafe::path_with_suffix_extension,
|
||||
yielding_loop::yielding_loop,
|
||||
};
|
||||
use utils::{backoff, completion::Barrier, yielding_loop::yielding_loop};
|
||||
|
||||
pub(super) async fn heatmap_uploader_task(
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
@@ -465,18 +461,6 @@ async fn upload_tenant_heatmap(
|
||||
}
|
||||
}
|
||||
|
||||
// After a successful upload persist the fresh heatmap to disk.
|
||||
// When restarting, the tenant will read the heatmap from disk
|
||||
// and additively generate a new heatmap (see [`Timeline::generate_heatmap`]).
|
||||
// If the heatmap is stale, the additive generation can lead to keeping previously
|
||||
// evicted timelines on the secondarie's disk.
|
||||
let tenant_shard_id = tenant.get_tenant_shard_id();
|
||||
let heatmap_path = tenant.conf.tenant_heatmap_path(tenant_shard_id);
|
||||
let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
|
||||
if let Err(err) = VirtualFile::crashsafe_overwrite(heatmap_path, temp_path, bytes).await {
|
||||
tracing::warn!("Non fatal IO error writing to disk after heatmap upload: {err}");
|
||||
}
|
||||
|
||||
tracing::info!("Successfully uploaded {size} byte heatmap to {path}");
|
||||
|
||||
Ok(UploadHeatmapOutcome::Uploaded(LastUploadState {
|
||||
|
||||
@@ -192,12 +192,7 @@ pub enum ImageLayerCreationMode {
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub enum LastImageLayerCreationStatus {
|
||||
Incomplete {
|
||||
/// The last key of the partition (exclusive) that was processed in the last
|
||||
/// image layer creation attempt. We will continue from this key in the next
|
||||
/// attempt.
|
||||
last_key: Key,
|
||||
},
|
||||
Incomplete, // TODO: record the last key being processed
|
||||
Complete,
|
||||
#[default]
|
||||
Initial,
|
||||
@@ -4351,7 +4346,7 @@ impl Timeline {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
// Is it time to create a new image layer for the given partition? True if we want to generate.
|
||||
// Is it time to create a new image layer for the given partition?
|
||||
async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
|
||||
let threshold = self.get_image_creation_threshold();
|
||||
|
||||
@@ -4663,11 +4658,6 @@ impl Timeline {
|
||||
) -> Result<(Vec<ResidentLayer>, LastImageLayerCreationStatus), CreateImageLayersError> {
|
||||
let timer = self.metrics.create_images_time_histo.start_timer();
|
||||
|
||||
if partitioning.parts.is_empty() {
|
||||
warn!("no partitions to create image layers for");
|
||||
return Ok((vec![], LastImageLayerCreationStatus::Complete));
|
||||
}
|
||||
|
||||
// We need to avoid holes between generated image layers.
|
||||
// Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
|
||||
// image layer with hole between them. In this case such layer can not be utilized by GC.
|
||||
@@ -4679,65 +4669,28 @@ impl Timeline {
|
||||
// image layers <100000000..100000099> and <200000000..200000199> are not completely covering it.
|
||||
let mut start = Key::MIN;
|
||||
|
||||
let check_for_image_layers =
|
||||
if let LastImageLayerCreationStatus::Incomplete { last_key } = last_status {
|
||||
info!(
|
||||
"resuming image layer creation: last_status=incomplete, continue from {}",
|
||||
last_key
|
||||
);
|
||||
true
|
||||
} else {
|
||||
self.should_check_if_image_layers_required(lsn)
|
||||
};
|
||||
let check_for_image_layers = if let LastImageLayerCreationStatus::Incomplete = last_status {
|
||||
info!(
|
||||
"resuming image layer creation: last_status={:?}",
|
||||
last_status
|
||||
);
|
||||
true
|
||||
} else {
|
||||
self.should_check_if_image_layers_required(lsn)
|
||||
};
|
||||
|
||||
let mut batch_image_writer = BatchLayerWriter::new(self.conf).await?;
|
||||
|
||||
let mut all_generated = true;
|
||||
|
||||
let mut partition_processed = 0;
|
||||
let mut total_partitions = partitioning.parts.len();
|
||||
let mut last_partition_processed = None;
|
||||
let mut partition_parts = partitioning.parts.clone();
|
||||
let total_partitions = partitioning.parts.len();
|
||||
|
||||
if let LastImageLayerCreationStatus::Incomplete { last_key } = last_status {
|
||||
// We need to skip the partitions that have already been processed.
|
||||
let mut found = false;
|
||||
for (i, partition) in partition_parts.iter().enumerate() {
|
||||
if last_key <= partition.end().unwrap() {
|
||||
// ```plain
|
||||
// |------|--------|----------|------|
|
||||
// ^last_key
|
||||
// ^start from this partition
|
||||
// ```
|
||||
// Why `i+1` instead of `i`?
|
||||
// It is possible that the user did some writes after the previous image layer creation attempt so that
|
||||
// a relation grows in size, and the last_key is now in the middle of the partition. In this case, we
|
||||
// still want to skip this partition, so that we can make progress and avoid generating image layers over
|
||||
// the same partition. Doing a mod to ensure we don't end up with an empty vec.
|
||||
if i + 1 >= total_partitions {
|
||||
// In general, this case should not happen -- if last_key is on the last partition, the previous
|
||||
// iteration of image layer creation should return a complete status.
|
||||
break; // with found=false
|
||||
}
|
||||
partition_parts = partition_parts.split_off(i + 1); // Remove the first i + 1 elements
|
||||
total_partitions = partition_parts.len();
|
||||
// Update the start key to the partition start.
|
||||
start = partition_parts[0].start().unwrap();
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
// Last key is within the last partition, or larger than all partitions.
|
||||
return Ok((vec![], LastImageLayerCreationStatus::Complete));
|
||||
}
|
||||
}
|
||||
|
||||
for partition in partition_parts.iter() {
|
||||
for partition in partitioning.parts.iter() {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CreateImageLayersError::Cancelled);
|
||||
}
|
||||
partition_processed += 1;
|
||||
|
||||
let img_range = start..partition.ranges.last().unwrap().end;
|
||||
let compact_metadata = partition.overlaps(&Key::metadata_key_range());
|
||||
if compact_metadata {
|
||||
@@ -4772,8 +4725,6 @@ impl Timeline {
|
||||
lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
|
||||
is_delta: false,
|
||||
}) {
|
||||
// TODO: this can be processed with the BatchLayerWriter::finish_with_discard
|
||||
// in the future.
|
||||
tracing::info!(
|
||||
"Skipping image layer at {lsn} {}..{}, already exists",
|
||||
img_range.start,
|
||||
@@ -4854,6 +4805,8 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
partition_processed += 1;
|
||||
|
||||
if let ImageLayerCreationMode::Try = mode {
|
||||
// We have at least made some progress
|
||||
if batch_image_writer.pending_layer_num() >= 1 {
|
||||
@@ -4869,10 +4822,8 @@ impl Timeline {
|
||||
* self.get_compaction_threshold();
|
||||
if image_preempt_threshold != 0 && num_of_l0_layers >= image_preempt_threshold {
|
||||
tracing::info!(
|
||||
"preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers {}",
|
||||
partition.start().unwrap(), partition.end().unwrap(), num_of_l0_layers
|
||||
"preempt image layer generation at {start} at {lsn}: too many L0 layers {num_of_l0_layers}",
|
||||
);
|
||||
last_partition_processed = Some(partition.clone());
|
||||
all_generated = false;
|
||||
break;
|
||||
}
|
||||
@@ -4917,14 +4868,7 @@ impl Timeline {
|
||||
if all_generated {
|
||||
LastImageLayerCreationStatus::Complete
|
||||
} else {
|
||||
LastImageLayerCreationStatus::Incomplete {
|
||||
last_key: if let Some(last_partition_processed) = last_partition_processed {
|
||||
last_partition_processed.end().unwrap_or(Key::MIN)
|
||||
} else {
|
||||
// This branch should be unreachable, but in case it happens, we can just return the start key.
|
||||
Key::MIN
|
||||
},
|
||||
}
|
||||
LastImageLayerCreationStatus::Incomplete
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
@@ -748,7 +748,7 @@ impl Timeline {
|
||||
.store(Arc::new(outcome.clone()));
|
||||
|
||||
self.upload_new_image_layers(image_layers)?;
|
||||
if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
|
||||
if let LastImageLayerCreationStatus::Incomplete = outcome {
|
||||
// Yield and do not do any other kind of compaction.
|
||||
info!("skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction).");
|
||||
return Ok(CompactionOutcome::Pending);
|
||||
|
||||
@@ -355,19 +355,6 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
// advances it to its end LSN. 0 is just an initialization placeholder.
|
||||
let mut modification = timeline.begin_modification(Lsn(0));
|
||||
|
||||
async fn commit(
|
||||
modification: &mut DatadirModification<'_>,
|
||||
ctx: &RequestContext,
|
||||
uncommitted: &mut u64,
|
||||
) -> anyhow::Result<()> {
|
||||
let stats = modification.stats();
|
||||
modification.commit(ctx).await?;
|
||||
WAL_INGEST.records_committed.inc_by(*uncommitted);
|
||||
WAL_INGEST.inc_values_committed(&stats);
|
||||
*uncommitted = 0;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
if !records.is_empty() {
|
||||
timeline
|
||||
.metrics
|
||||
@@ -379,7 +366,8 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
|
||||
&& uncommitted_records > 0
|
||||
{
|
||||
commit(&mut modification, &ctx, &mut uncommitted_records).await?;
|
||||
modification.commit(&ctx).await?;
|
||||
uncommitted_records = 0;
|
||||
}
|
||||
|
||||
let local_next_record_lsn = interpreted.next_record_lsn;
|
||||
@@ -408,7 +396,8 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
|| modification.approx_pending_bytes()
|
||||
> DatadirModification::MAX_PENDING_BYTES
|
||||
{
|
||||
commit(&mut modification, &ctx, &mut uncommitted_records).await?;
|
||||
modification.commit(&ctx).await?;
|
||||
uncommitted_records = 0;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -426,7 +415,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
|
||||
if uncommitted_records > 0 || needs_last_record_lsn_advance {
|
||||
// Commit any uncommitted records
|
||||
commit(&mut modification, &ctx, &mut uncommitted_records).await?;
|
||||
modification.commit(&ctx).await?;
|
||||
}
|
||||
|
||||
if !caught_up && streaming_lsn >= end_of_wal {
|
||||
@@ -453,12 +442,10 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
filtered: &mut u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let stats = modification.stats();
|
||||
modification.commit(ctx).await?;
|
||||
WAL_INGEST
|
||||
.records_committed
|
||||
.inc_by(*uncommitted - *filtered);
|
||||
WAL_INGEST.inc_values_committed(&stats);
|
||||
modification.commit(ctx).await?;
|
||||
*uncommitted = 0;
|
||||
*filtered = 0;
|
||||
Ok(())
|
||||
|
||||
@@ -28,9 +28,17 @@ use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_ffi::fsm_logical_to_physical;
|
||||
use postgres_ffi::walrecord::*;
|
||||
use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
|
||||
use wal_decoder::models::*;
|
||||
|
||||
use anyhow::{bail, Result};
|
||||
use bytes::{Buf, Bytes};
|
||||
use tracing::*;
|
||||
use utils::failpoint_support;
|
||||
use utils::rate_limit::RateLimit;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::metrics::WAL_INGEST;
|
||||
@@ -42,18 +50,11 @@ use crate::ZERO_PAGE;
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_ffi::fsm_logical_to_physical;
|
||||
use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::walrecord::*;
|
||||
use postgres_ffi::TransactionId;
|
||||
use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
|
||||
use utils::bin_ser::SerializeError;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::rate_limit::RateLimit;
|
||||
use utils::{critical, failpoint_support};
|
||||
use wal_decoder::models::*;
|
||||
|
||||
enum_pgversion! {CheckPoint, pgv::CheckPoint}
|
||||
|
||||
@@ -326,75 +327,93 @@ impl WalIngest {
|
||||
let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
|
||||
let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
|
||||
|
||||
// VM bits can only be cleared on the shard(s) owning the VM relation, and must be within
|
||||
// its view of the VM relation size. Out of caution, error instead of failing WAL ingestion,
|
||||
// as there has historically been cases where PostgreSQL has cleared spurious VM pages. See:
|
||||
// https://github.com/neondatabase/neon/pull/10634.
|
||||
// Sometimes, Postgres seems to create heap WAL records with the
|
||||
// ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
|
||||
// not set. In fact, it's possible that the VM page does not exist at all.
|
||||
// In that case, we don't want to store a record to clear the VM bit;
|
||||
// replaying it would fail to find the previous image of the page, because
|
||||
// it doesn't exist. So check if the VM page(s) exist, and skip the WAL
|
||||
// record if it doesn't.
|
||||
//
|
||||
// TODO: analyze the metrics and tighten this up accordingly. This logic
|
||||
// implicitly assumes that VM pages see explicit WAL writes before
|
||||
// implicit ClearVmBits, and will otherwise silently drop updates.
|
||||
let Some(vm_size) = get_relsize(modification, vm_rel, ctx).await? else {
|
||||
critical!("clear_vm_bits for unknown VM relation {vm_rel}");
|
||||
WAL_INGEST
|
||||
.clear_vm_bits_unknown
|
||||
.with_label_values(&["relation"])
|
||||
.inc();
|
||||
return Ok(());
|
||||
};
|
||||
if let Some(blknum) = new_vm_blk {
|
||||
if blknum >= vm_size {
|
||||
critical!("new_vm_blk {blknum} not in {vm_rel} of size {vm_size}");
|
||||
WAL_INGEST
|
||||
.clear_vm_bits_unknown
|
||||
.with_label_values(&["new_page"])
|
||||
.inc();
|
||||
new_vm_blk = None;
|
||||
}
|
||||
}
|
||||
if let Some(blknum) = old_vm_blk {
|
||||
if blknum >= vm_size {
|
||||
critical!("old_vm_blk {blknum} not in {vm_rel} of size {vm_size}");
|
||||
WAL_INGEST
|
||||
.clear_vm_bits_unknown
|
||||
.with_label_values(&["old_page"])
|
||||
.inc();
|
||||
old_vm_blk = None;
|
||||
}
|
||||
}
|
||||
|
||||
if new_vm_blk.is_none() && old_vm_blk.is_none() {
|
||||
return Ok(());
|
||||
} else if new_vm_blk == old_vm_blk {
|
||||
// An UPDATE record that needs to clear the bits for both old and the new page, both of
|
||||
// which reside on the same VM page.
|
||||
self.put_rel_wal_record(
|
||||
modification,
|
||||
vm_rel,
|
||||
new_vm_blk.unwrap(),
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
new_heap_blkno,
|
||||
old_heap_blkno,
|
||||
flags,
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
// Clear VM bits for one heap page, or for two pages that reside on different VM pages.
|
||||
if let Some(new_vm_blk) = new_vm_blk {
|
||||
if new_vm_blk.is_some() || old_vm_blk.is_some() {
|
||||
if new_vm_blk == old_vm_blk {
|
||||
// An UPDATE record that needs to clear the bits for both old and the
|
||||
// new page, both of which reside on the same VM page.
|
||||
self.put_rel_wal_record(
|
||||
modification,
|
||||
vm_rel,
|
||||
new_vm_blk,
|
||||
new_vm_blk.unwrap(),
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
new_heap_blkno,
|
||||
old_heap_blkno: None,
|
||||
flags,
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
if let Some(old_vm_blk) = old_vm_blk {
|
||||
self.put_rel_wal_record(
|
||||
modification,
|
||||
vm_rel,
|
||||
old_vm_blk,
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
new_heap_blkno: None,
|
||||
old_heap_blkno,
|
||||
flags,
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
// Clear VM bits for one heap page, or for two pages that reside on
|
||||
// different VM pages.
|
||||
if let Some(new_vm_blk) = new_vm_blk {
|
||||
self.put_rel_wal_record(
|
||||
modification,
|
||||
vm_rel,
|
||||
new_vm_blk,
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
new_heap_blkno,
|
||||
old_heap_blkno: None,
|
||||
flags,
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
if let Some(old_vm_blk) = old_vm_blk {
|
||||
self.put_rel_wal_record(
|
||||
modification,
|
||||
vm_rel,
|
||||
old_vm_blk,
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
new_heap_blkno: None,
|
||||
old_heap_blkno,
|
||||
flags,
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -220,8 +220,10 @@ lfc_maybe_disabled(void)
|
||||
static bool
|
||||
lfc_ensure_opened(void)
|
||||
{
|
||||
bool enabled = !lfc_maybe_disabled();
|
||||
|
||||
/* Open cache file if not done yet */
|
||||
if (lfc_desc <= 0)
|
||||
if (lfc_desc <= 0 && enabled)
|
||||
{
|
||||
lfc_desc = BasicOpenFile(lfc_path, O_RDWR);
|
||||
|
||||
@@ -231,7 +233,7 @@ lfc_ensure_opened(void)
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
return enabled;
|
||||
}
|
||||
|
||||
static void
|
||||
@@ -336,11 +338,10 @@ lfc_change_limit_hook(int newval, void *extra)
|
||||
{
|
||||
uint32 new_size = SIZE_MB_TO_CHUNKS(newval);
|
||||
|
||||
if (!lfc_ctl || !is_normal_backend())
|
||||
if (!is_normal_backend())
|
||||
return;
|
||||
|
||||
/* Open LFC file only if LFC was enabled or we are going to reenable it */
|
||||
if ((newval > 0 || LFC_ENABLED()) && !lfc_ensure_opened())
|
||||
if (!lfc_ensure_opened())
|
||||
return;
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
@@ -19,7 +19,6 @@ aws-config.workspace = true
|
||||
aws-sdk-iam.workspace = true
|
||||
aws-sigv4.workspace = true
|
||||
base64.workspace = true
|
||||
boxcar = "0.2.8"
|
||||
bstr.workspace = true
|
||||
bytes = { workspace = true, features = ["serde"] }
|
||||
camino.workspace = true
|
||||
@@ -43,7 +42,6 @@ hyper0.workspace = true
|
||||
hyper = { workspace = true, features = ["server", "http1", "http2"] }
|
||||
hyper-util = { version = "0.1", features = ["server", "http1", "http2", "tokio"] }
|
||||
http-body-util = { version = "0.1" }
|
||||
gettid = "0.1.3"
|
||||
indexmap = { workspace = true, features = ["serde"] }
|
||||
ipnet.workspace = true
|
||||
itertools.workspace = true
|
||||
@@ -52,8 +50,6 @@ lasso = { workspace = true, features = ["multi-threaded"] }
|
||||
measured = { workspace = true, features = ["lasso"] }
|
||||
metrics.workspace = true
|
||||
once_cell.workspace = true
|
||||
opentelemetry = { workspace = true, features = ["trace"] }
|
||||
papaya = "0.1.8"
|
||||
parking_lot.workspace = true
|
||||
parquet.workspace = true
|
||||
parquet_derive.workspace = true
|
||||
@@ -93,9 +89,6 @@ tokio = { workspace = true, features = ["signal"] }
|
||||
tracing-subscriber.workspace = true
|
||||
tracing-utils.workspace = true
|
||||
tracing.workspace = true
|
||||
tracing-log.workspace = true
|
||||
tracing-serde.workspace = true
|
||||
tracing-opentelemetry.workspace = true
|
||||
try-lock.workspace = true
|
||||
typed-json.workspace = true
|
||||
url.workspace = true
|
||||
@@ -119,7 +112,6 @@ rsa = "0.9"
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
assert-json-diff.workspace = true
|
||||
camino-tempfile.workspace = true
|
||||
fallible-iterator.workspace = true
|
||||
flate2.workspace = true
|
||||
|
||||
@@ -1,23 +1,10 @@
|
||||
use std::cell::{Cell, RefCell};
|
||||
use std::collections::HashMap;
|
||||
use std::hash::BuildHasher;
|
||||
use std::{env, io};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use opentelemetry::trace::TraceContextExt;
|
||||
use scopeguard::defer;
|
||||
use serde::ser::{SerializeMap, Serializer};
|
||||
use tracing::span;
|
||||
use tracing::subscriber::Interest;
|
||||
use tracing::{callsite, Event, Metadata, Span, Subscriber};
|
||||
use tracing_opentelemetry::OpenTelemetrySpanExt;
|
||||
use tracing::Subscriber;
|
||||
use tracing_subscriber::filter::{EnvFilter, LevelFilter};
|
||||
use tracing_subscriber::fmt::format::{Format, Full};
|
||||
use tracing_subscriber::fmt::time::SystemTime;
|
||||
use tracing_subscriber::fmt::{FormatEvent, FormatFields};
|
||||
use tracing_subscriber::layer::{Context, Layer};
|
||||
use tracing_subscriber::prelude::*;
|
||||
use tracing_subscriber::registry::{LookupSpan, SpanRef};
|
||||
use tracing_subscriber::registry::LookupSpan;
|
||||
|
||||
/// Initialize logging and OpenTelemetry tracing and exporter.
|
||||
///
|
||||
@@ -28,8 +15,6 @@ use tracing_subscriber::registry::{LookupSpan, SpanRef};
|
||||
/// destination, set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`.
|
||||
/// See <https://opentelemetry.io/docs/reference/specification/sdk-environment-variables>
|
||||
pub async fn init() -> anyhow::Result<LoggingGuard> {
|
||||
let logfmt = LogFormat::from_env()?;
|
||||
|
||||
let env_filter = EnvFilter::builder()
|
||||
.with_default_directive(LevelFilter::INFO.into())
|
||||
.from_env_lossy()
|
||||
@@ -44,36 +29,17 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
|
||||
.expect("this should be a valid filter directive"),
|
||||
);
|
||||
|
||||
let fmt_layer = tracing_subscriber::fmt::layer()
|
||||
.with_ansi(false)
|
||||
.with_writer(std::io::stderr)
|
||||
.with_target(false);
|
||||
|
||||
let otlp_layer = tracing_utils::init_tracing("proxy").await;
|
||||
|
||||
let json_log_layer = if logfmt == LogFormat::Json {
|
||||
Some(JsonLoggingLayer {
|
||||
clock: RealClock,
|
||||
skipped_field_indices: papaya::HashMap::default(),
|
||||
writer: StderrWriter {
|
||||
stderr: std::io::stderr(),
|
||||
},
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let text_log_layer = if logfmt == LogFormat::Text {
|
||||
Some(
|
||||
tracing_subscriber::fmt::layer()
|
||||
.with_ansi(false)
|
||||
.with_writer(std::io::stderr)
|
||||
.with_target(false),
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(env_filter)
|
||||
.with(otlp_layer)
|
||||
.with(json_log_layer)
|
||||
.with(text_log_layer)
|
||||
.with(fmt_layer)
|
||||
.try_init()?;
|
||||
|
||||
Ok(LoggingGuard)
|
||||
@@ -128,857 +94,3 @@ impl Drop for LoggingGuard {
|
||||
tracing_utils::shutdown_tracing();
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: make JSON the default
|
||||
#[derive(Copy, Clone, PartialEq, Eq, Default, Debug)]
|
||||
enum LogFormat {
|
||||
#[default]
|
||||
Text = 1,
|
||||
Json,
|
||||
}
|
||||
|
||||
impl LogFormat {
|
||||
fn from_env() -> anyhow::Result<Self> {
|
||||
let logfmt = env::var("LOGFMT");
|
||||
Ok(match logfmt.as_deref() {
|
||||
Err(_) => LogFormat::default(),
|
||||
Ok("text") => LogFormat::Text,
|
||||
Ok("json") => LogFormat::Json,
|
||||
Ok(logfmt) => anyhow::bail!("unknown log format: {logfmt}"),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
trait MakeWriter {
|
||||
fn make_writer(&self) -> impl io::Write;
|
||||
}
|
||||
|
||||
struct StderrWriter {
|
||||
stderr: io::Stderr,
|
||||
}
|
||||
|
||||
impl MakeWriter for StderrWriter {
|
||||
#[inline]
|
||||
fn make_writer(&self) -> impl io::Write {
|
||||
self.stderr.lock()
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: move into separate module or even separate crate.
|
||||
trait Clock {
|
||||
fn now(&self) -> DateTime<Utc>;
|
||||
}
|
||||
|
||||
struct RealClock;
|
||||
|
||||
impl Clock for RealClock {
|
||||
#[inline]
|
||||
fn now(&self) -> DateTime<Utc> {
|
||||
Utc::now()
|
||||
}
|
||||
}
|
||||
|
||||
/// Name of the field used by tracing crate to store the event message.
|
||||
const MESSAGE_FIELD: &str = "message";
|
||||
|
||||
thread_local! {
|
||||
/// Protects against deadlocks and double panics during log writing.
|
||||
/// The current panic handler will use tracing to log panic information.
|
||||
static REENTRANCY_GUARD: Cell<bool> = const { Cell::new(false) };
|
||||
/// Thread-local instance with per-thread buffer for log writing.
|
||||
static EVENT_FORMATTER: RefCell<EventFormatter> = RefCell::new(EventFormatter::new());
|
||||
/// Cached OS thread ID.
|
||||
static THREAD_ID: u64 = gettid::gettid();
|
||||
}
|
||||
|
||||
/// Implements tracing layer to handle events specific to logging.
|
||||
struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
|
||||
clock: C,
|
||||
skipped_field_indices: papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
|
||||
writer: W,
|
||||
}
|
||||
|
||||
impl<S, C: Clock + 'static, W: MakeWriter + 'static> Layer<S> for JsonLoggingLayer<C, W>
|
||||
where
|
||||
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||
{
|
||||
fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
|
||||
use std::io::Write;
|
||||
|
||||
// TODO: consider special tracing subscriber to grab timestamp very
|
||||
// early, before OTel machinery, and add as event extension.
|
||||
let now = self.clock.now();
|
||||
|
||||
let res: io::Result<()> = REENTRANCY_GUARD.with(move |entered| {
|
||||
if entered.get() {
|
||||
let mut formatter = EventFormatter::new();
|
||||
formatter.format(now, event, &ctx, &self.skipped_field_indices)?;
|
||||
self.writer.make_writer().write_all(formatter.buffer())
|
||||
} else {
|
||||
entered.set(true);
|
||||
defer!(entered.set(false););
|
||||
|
||||
EVENT_FORMATTER.with_borrow_mut(move |formatter| {
|
||||
formatter.reset();
|
||||
formatter.format(now, event, &ctx, &self.skipped_field_indices)?;
|
||||
self.writer.make_writer().write_all(formatter.buffer())
|
||||
})
|
||||
}
|
||||
});
|
||||
|
||||
// In case logging fails we generate a simpler JSON object.
|
||||
if let Err(err) = res {
|
||||
if let Ok(mut line) = serde_json::to_vec(&serde_json::json!( {
|
||||
"timestamp": now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
|
||||
"level": "ERROR",
|
||||
"message": format_args!("cannot log event: {err:?}"),
|
||||
"fields": {
|
||||
"event": format_args!("{event:?}"),
|
||||
},
|
||||
})) {
|
||||
line.push(b'\n');
|
||||
self.writer.make_writer().write_all(&line).ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Registers a SpanFields instance as span extension.
|
||||
fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
|
||||
let span = ctx.span(id).expect("span must exist");
|
||||
let fields = SpanFields::default();
|
||||
fields.record_fields(attrs);
|
||||
// This could deadlock when there's a panic somewhere in the tracing
|
||||
// event handling and a read or write guard is still held. This includes
|
||||
// the OTel subscriber.
|
||||
span.extensions_mut().insert(fields);
|
||||
}
|
||||
|
||||
fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
|
||||
let span = ctx.span(id).expect("span must exist");
|
||||
let ext = span.extensions();
|
||||
if let Some(data) = ext.get::<SpanFields>() {
|
||||
data.record_fields(values);
|
||||
}
|
||||
}
|
||||
|
||||
/// Called (lazily) whenever a new log call is executed. We quickly check
|
||||
/// for duplicate field names and record duplicates as skippable. Last one
|
||||
/// wins.
|
||||
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
|
||||
if !metadata.is_event() {
|
||||
// Must not be never because we wouldn't get trace and span data.
|
||||
return Interest::always();
|
||||
}
|
||||
|
||||
let mut field_indices = SkippedFieldIndices::default();
|
||||
let mut seen_fields = HashMap::<&'static str, usize>::new();
|
||||
for field in metadata.fields() {
|
||||
use std::collections::hash_map::Entry;
|
||||
match seen_fields.entry(field.name()) {
|
||||
Entry::Vacant(entry) => {
|
||||
// field not seen yet
|
||||
entry.insert(field.index());
|
||||
}
|
||||
Entry::Occupied(mut entry) => {
|
||||
// replace currently stored index
|
||||
let old_index = entry.insert(field.index());
|
||||
// ... and append it to list of skippable indices
|
||||
field_indices.push(old_index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !field_indices.is_empty() {
|
||||
self.skipped_field_indices
|
||||
.pin()
|
||||
.insert(metadata.callsite(), field_indices);
|
||||
}
|
||||
|
||||
Interest::always()
|
||||
}
|
||||
}
|
||||
|
||||
/// Stores span field values recorded during the spans lifetime.
|
||||
#[derive(Default)]
|
||||
struct SpanFields {
|
||||
// TODO: Switch to custom enum with lasso::Spur for Strings?
|
||||
fields: papaya::HashMap<&'static str, serde_json::Value>,
|
||||
}
|
||||
|
||||
impl SpanFields {
|
||||
#[inline]
|
||||
fn record_fields<R: tracing_subscriber::field::RecordFields>(&self, fields: R) {
|
||||
fields.record(&mut SpanFieldsRecorder {
|
||||
fields: self.fields.pin(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements a tracing field visitor to convert and store values.
|
||||
struct SpanFieldsRecorder<'m, S, G> {
|
||||
fields: papaya::HashMapRef<'m, &'static str, serde_json::Value, S, G>,
|
||||
}
|
||||
|
||||
impl<S: BuildHasher, G: papaya::Guard> tracing::field::Visit for SpanFieldsRecorder<'_, S, G> {
|
||||
#[inline]
|
||||
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
|
||||
if let Ok(value) = i64::try_from(value) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
} else {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(format!("{value}")));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
|
||||
if let Ok(value) = u64::try_from(value) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
} else {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(format!("{value}")));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(format!("{value:?}")));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_error(
|
||||
&mut self,
|
||||
field: &tracing::field::Field,
|
||||
value: &(dyn std::error::Error + 'static),
|
||||
) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(format!("{value}")));
|
||||
}
|
||||
}
|
||||
|
||||
/// List of field indices skipped during logging. Can list duplicate fields or
|
||||
/// metafields not meant to be logged.
|
||||
#[derive(Clone, Default)]
|
||||
struct SkippedFieldIndices {
|
||||
bits: u64,
|
||||
}
|
||||
|
||||
impl SkippedFieldIndices {
|
||||
#[inline]
|
||||
fn is_empty(&self) -> bool {
|
||||
self.bits == 0
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn push(&mut self, index: usize) {
|
||||
self.bits |= 1u64
|
||||
.checked_shl(index as u32)
|
||||
.expect("field index too large");
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn contains(&self, index: usize) -> bool {
|
||||
self.bits
|
||||
& 1u64
|
||||
.checked_shl(index as u32)
|
||||
.expect("field index too large")
|
||||
!= 0
|
||||
}
|
||||
}
|
||||
|
||||
/// Formats a tracing event and writes JSON to its internal buffer including a newline.
|
||||
// TODO: buffer capacity management, truncate if too large
|
||||
struct EventFormatter {
|
||||
logline_buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
impl EventFormatter {
|
||||
#[inline]
|
||||
fn new() -> Self {
|
||||
EventFormatter {
|
||||
logline_buffer: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn buffer(&self) -> &[u8] {
|
||||
&self.logline_buffer
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn reset(&mut self) {
|
||||
self.logline_buffer.clear();
|
||||
}
|
||||
|
||||
fn format<S>(
|
||||
&mut self,
|
||||
now: DateTime<Utc>,
|
||||
event: &Event<'_>,
|
||||
ctx: &Context<'_, S>,
|
||||
skipped_field_indices: &papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
|
||||
) -> io::Result<()>
|
||||
where
|
||||
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||
{
|
||||
let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
|
||||
|
||||
use tracing_log::NormalizeEvent;
|
||||
let normalized_meta = event.normalized_metadata();
|
||||
let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
|
||||
|
||||
let skipped_field_indices = skipped_field_indices.pin();
|
||||
let skipped_field_indices = skipped_field_indices.get(&meta.callsite());
|
||||
|
||||
let mut serialize = || {
|
||||
let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer);
|
||||
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
|
||||
// Timestamp comes first, so raw lines can be sorted by timestamp.
|
||||
serializer.serialize_entry("timestamp", ×tamp)?;
|
||||
|
||||
// Level next.
|
||||
serializer.serialize_entry("level", &meta.level().as_str())?;
|
||||
|
||||
// Message next.
|
||||
serializer.serialize_key("message")?;
|
||||
let mut message_extractor =
|
||||
MessageFieldExtractor::new(serializer, skipped_field_indices);
|
||||
event.record(&mut message_extractor);
|
||||
let mut serializer = message_extractor.into_serializer()?;
|
||||
|
||||
let mut fields_present = FieldsPresent(false, skipped_field_indices);
|
||||
event.record(&mut fields_present);
|
||||
if fields_present.0 {
|
||||
serializer.serialize_entry(
|
||||
"fields",
|
||||
&SerializableEventFields(event, skipped_field_indices),
|
||||
)?;
|
||||
}
|
||||
|
||||
let pid = std::process::id();
|
||||
if pid != 1 {
|
||||
serializer.serialize_entry("process_id", &pid)?;
|
||||
}
|
||||
|
||||
THREAD_ID.with(|tid| serializer.serialize_entry("thread_id", tid))?;
|
||||
|
||||
// TODO: tls cache? name could change
|
||||
if let Some(thread_name) = std::thread::current().name() {
|
||||
if !thread_name.is_empty() && thread_name != "tokio-runtime-worker" {
|
||||
serializer.serialize_entry("thread_name", thread_name)?;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(task_id) = tokio::task::try_id() {
|
||||
serializer.serialize_entry("task_id", &format_args!("{task_id}"))?;
|
||||
}
|
||||
|
||||
serializer.serialize_entry("target", meta.target())?;
|
||||
|
||||
if let Some(module) = meta.module_path() {
|
||||
if module != meta.target() {
|
||||
serializer.serialize_entry("module", module)?;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(file) = meta.file() {
|
||||
if let Some(line) = meta.line() {
|
||||
serializer.serialize_entry("src", &format_args!("{file}:{line}"))?;
|
||||
} else {
|
||||
serializer.serialize_entry("src", file)?;
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
let otel_context = Span::current().context();
|
||||
let otel_spanref = otel_context.span();
|
||||
let span_context = otel_spanref.span_context();
|
||||
if span_context.is_valid() {
|
||||
serializer.serialize_entry(
|
||||
"trace_id",
|
||||
&format_args!("{}", span_context.trace_id()),
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
serializer.serialize_entry("spans", &SerializableSpanStack(ctx))?;
|
||||
|
||||
serializer.end()
|
||||
};
|
||||
|
||||
serialize().map_err(io::Error::other)?;
|
||||
self.logline_buffer.push(b'\n');
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Extracts the message field that's mixed will other fields.
|
||||
struct MessageFieldExtractor<'a, S: serde::ser::SerializeMap> {
|
||||
serializer: S,
|
||||
skipped_field_indices: Option<&'a SkippedFieldIndices>,
|
||||
state: Option<Result<(), S::Error>>,
|
||||
}
|
||||
|
||||
impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> {
|
||||
#[inline]
|
||||
fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
|
||||
Self {
|
||||
serializer,
|
||||
skipped_field_indices,
|
||||
state: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn into_serializer(mut self) -> Result<S, S::Error> {
|
||||
match self.state {
|
||||
Some(Ok(())) => {}
|
||||
Some(Err(err)) => return Err(err),
|
||||
None => self.serializer.serialize_value("")?,
|
||||
}
|
||||
Ok(self.serializer)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn accept_field(&self, field: &tracing::field::Field) -> bool {
|
||||
self.state.is_none()
|
||||
&& field.name() == MESSAGE_FIELD
|
||||
&& !self
|
||||
.skipped_field_indices
|
||||
.is_some_and(|i| i.contains(field.index()))
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<'_, S> {
|
||||
#[inline]
|
||||
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&value));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&value));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&value));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&value));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&value));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&value));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&format_args!("{value:x?}")));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&value));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&format_args!("{value:?}")));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_error(
|
||||
&mut self,
|
||||
field: &tracing::field::Field,
|
||||
value: &(dyn std::error::Error + 'static),
|
||||
) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&format_args!("{value}")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks if there's any fields and field values present. If not, the JSON subobject
|
||||
/// can be skipped.
|
||||
// This is entirely optional and only cosmetic, though maybe helps a
|
||||
// bit during log parsing in dashboards when there's no field with empty object.
|
||||
struct FieldsPresent<'a>(pub bool, Option<&'a SkippedFieldIndices>);
|
||||
|
||||
// Even though some methods have an overhead (error, bytes) it is assumed the
|
||||
// compiler won't include this since we ignore the value entirely.
|
||||
impl tracing::field::Visit for FieldsPresent<'_> {
|
||||
#[inline]
|
||||
fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) {
|
||||
if !self.1.is_some_and(|i| i.contains(field.index()))
|
||||
&& field.name() != MESSAGE_FIELD
|
||||
&& !field.name().starts_with("log.")
|
||||
{
|
||||
self.0 |= true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializes the fields directly supplied with a log event.
|
||||
struct SerializableEventFields<'a, 'event>(
|
||||
&'a tracing::Event<'event>,
|
||||
Option<&'a SkippedFieldIndices>,
|
||||
);
|
||||
|
||||
impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
use serde::ser::SerializeMap;
|
||||
let serializer = serializer.serialize_map(None)?;
|
||||
let mut message_skipper = MessageFieldSkipper::new(serializer, self.1);
|
||||
self.0.record(&mut message_skipper);
|
||||
let serializer = message_skipper.into_serializer()?;
|
||||
serializer.end()
|
||||
}
|
||||
}
|
||||
|
||||
/// A tracing field visitor that skips the message field.
|
||||
struct MessageFieldSkipper<'a, S: serde::ser::SerializeMap> {
|
||||
serializer: S,
|
||||
skipped_field_indices: Option<&'a SkippedFieldIndices>,
|
||||
state: Result<(), S::Error>,
|
||||
}
|
||||
|
||||
impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
|
||||
#[inline]
|
||||
fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
|
||||
Self {
|
||||
serializer,
|
||||
skipped_field_indices,
|
||||
state: Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn accept_field(&self, field: &tracing::field::Field) -> bool {
|
||||
self.state.is_ok()
|
||||
&& field.name() != MESSAGE_FIELD
|
||||
&& !field.name().starts_with("log.")
|
||||
&& !self
|
||||
.skipped_field_indices
|
||||
.is_some_and(|i| i.contains(field.index()))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn into_serializer(self) -> Result<S, S::Error> {
|
||||
self.state?;
|
||||
Ok(self.serializer)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<'_, S> {
|
||||
#[inline]
|
||||
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_entry(field.name(), &value);
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_entry(field.name(), &value);
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_entry(field.name(), &value);
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_entry(field.name(), &value);
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_entry(field.name(), &value);
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_entry(field.name(), &value);
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self
|
||||
.serializer
|
||||
.serialize_entry(field.name(), &format_args!("{value:x?}"));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_entry(field.name(), &value);
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self
|
||||
.serializer
|
||||
.serialize_entry(field.name(), &format_args!("{value:?}"));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_error(
|
||||
&mut self,
|
||||
field: &tracing::field::Field,
|
||||
value: &(dyn std::error::Error + 'static),
|
||||
) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_value(&format_args!("{value}"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializes the span stack from root to leaf (parent of event) enumerated
|
||||
/// inside an object where the keys are just the number padded with zeroes
|
||||
/// to retain sorting order.
|
||||
// The object is necessary because Loki cannot flatten arrays.
|
||||
struct SerializableSpanStack<'a, 'b, Span>(&'b Context<'a, Span>)
|
||||
where
|
||||
Span: Subscriber + for<'lookup> LookupSpan<'lookup>;
|
||||
|
||||
impl<Span> serde::ser::Serialize for SerializableSpanStack<'_, '_, Span>
|
||||
where
|
||||
Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
|
||||
{
|
||||
fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
|
||||
where
|
||||
Ser: serde::ser::Serializer,
|
||||
{
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
|
||||
if let Some(leaf_span) = self.0.lookup_current() {
|
||||
for (i, span) in leaf_span.scope().from_root().enumerate() {
|
||||
serializer.serialize_entry(&format_args!("{i:02}"), &SerializableSpan(&span))?;
|
||||
}
|
||||
}
|
||||
|
||||
serializer.end()
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializes a single span. Include the span ID, name and its fields as
|
||||
/// recorded up to this point.
|
||||
struct SerializableSpan<'a, 'b, Span>(&'b SpanRef<'a, Span>)
|
||||
where
|
||||
Span: for<'lookup> LookupSpan<'lookup>;
|
||||
|
||||
impl<Span> serde::ser::Serialize for SerializableSpan<'_, '_, Span>
|
||||
where
|
||||
Span: for<'lookup> LookupSpan<'lookup>,
|
||||
{
|
||||
fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
|
||||
where
|
||||
Ser: serde::ser::Serializer,
|
||||
{
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
// TODO: the span ID is probably only useful for debugging tracing.
|
||||
serializer.serialize_entry("span_id", &format_args!("{:016x}", self.0.id().into_u64()))?;
|
||||
serializer.serialize_entry("span_name", self.0.metadata().name())?;
|
||||
|
||||
let ext = self.0.extensions();
|
||||
if let Some(data) = ext.get::<SpanFields>() {
|
||||
for (key, value) in &data.fields.pin() {
|
||||
serializer.serialize_entry(key, value)?;
|
||||
}
|
||||
}
|
||||
|
||||
serializer.end()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[allow(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
|
||||
use assert_json_diff::assert_json_eq;
|
||||
use tracing::info_span;
|
||||
|
||||
use super::*;
|
||||
|
||||
struct TestClock {
|
||||
current_time: Mutex<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
impl Clock for Arc<TestClock> {
|
||||
fn now(&self) -> DateTime<Utc> {
|
||||
*self.current_time.lock().expect("poisoned")
|
||||
}
|
||||
}
|
||||
|
||||
struct VecWriter<'a> {
|
||||
buffer: MutexGuard<'a, Vec<u8>>,
|
||||
}
|
||||
|
||||
impl MakeWriter for Arc<Mutex<Vec<u8>>> {
|
||||
fn make_writer(&self) -> impl io::Write {
|
||||
VecWriter {
|
||||
buffer: self.lock().expect("poisoned"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl io::Write for VecWriter<'_> {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.buffer.write(buf)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_field_collection() {
|
||||
let clock = Arc::new(TestClock {
|
||||
current_time: Mutex::new(Utc::now()),
|
||||
});
|
||||
let buffer = Arc::new(Mutex::new(Vec::new()));
|
||||
let log_layer = JsonLoggingLayer {
|
||||
clock: clock.clone(),
|
||||
skipped_field_indices: papaya::HashMap::default(),
|
||||
writer: buffer.clone(),
|
||||
};
|
||||
|
||||
let registry = tracing_subscriber::Registry::default().with(log_layer);
|
||||
|
||||
tracing::subscriber::with_default(registry, || {
|
||||
info_span!("span1", x = 40, x = 41, x = 42).in_scope(|| {
|
||||
info_span!("span2").in_scope(|| {
|
||||
tracing::error!(
|
||||
a = 1,
|
||||
a = 2,
|
||||
a = 3,
|
||||
message = "explicit message field",
|
||||
"implicit message field"
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
let buffer = Arc::try_unwrap(buffer)
|
||||
.expect("no other reference")
|
||||
.into_inner()
|
||||
.expect("poisoned");
|
||||
let actual: serde_json::Value = serde_json::from_slice(&buffer).expect("valid JSON");
|
||||
let expected: serde_json::Value = serde_json::json!(
|
||||
{
|
||||
"timestamp": clock.now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
|
||||
"level": "ERROR",
|
||||
"message": "explicit message field",
|
||||
"fields": {
|
||||
"a": 3,
|
||||
},
|
||||
"spans": {
|
||||
"00":{
|
||||
"span_id": "0000000000000001",
|
||||
"span_name": "span1",
|
||||
"x": 42,
|
||||
},
|
||||
"01": {
|
||||
"span_id": "0000000000000002",
|
||||
"span_name": "span2",
|
||||
}
|
||||
},
|
||||
"src": actual.as_object().unwrap().get("src").unwrap().as_str().unwrap(),
|
||||
"target": "proxy::logging::tests",
|
||||
"process_id": actual.as_object().unwrap().get("process_id").unwrap().as_number().unwrap(),
|
||||
"thread_id": actual.as_object().unwrap().get("thread_id").unwrap().as_number().unwrap(),
|
||||
"thread_name": "logging::tests::test_field_collection",
|
||||
}
|
||||
);
|
||||
|
||||
assert_json_eq!(actual, expected);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -225,7 +225,7 @@ pub(crate) enum NotifyError {
|
||||
// We shutdown while sending
|
||||
#[error("Shutting down")]
|
||||
ShuttingDown,
|
||||
// A response indicates we will never succeed, such as 400 or 403
|
||||
// A response indicates we will never succeed, such as 400 or 404
|
||||
#[error("Non-retryable error {0}")]
|
||||
Fatal(StatusCode),
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ use pageserver_api::shard::ShardConfigError;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use pageserver_api::shard::ShardStripeSize;
|
||||
use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
|
||||
use rustls::client::danger::{ServerCertVerified, ServerCertVerifier};
|
||||
use rustls::client::danger::ServerCertVerifier;
|
||||
use rustls::client::WebPkiServerVerifier;
|
||||
use rustls::crypto::ring;
|
||||
use scoped_futures::ScopedBoxFuture;
|
||||
@@ -194,8 +194,6 @@ impl Persistence {
|
||||
timeout: Duration,
|
||||
) -> Result<(), diesel::ConnectionError> {
|
||||
let started_at = Instant::now();
|
||||
log_postgres_connstr_info(database_url)
|
||||
.map_err(|e| diesel::ConnectionError::InvalidConnectionUrl(e.to_string()))?;
|
||||
loop {
|
||||
match establish_connection_rustls(database_url).await {
|
||||
Ok(_) => {
|
||||
@@ -1283,51 +1281,6 @@ pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
|
||||
Ok(Arc::new(store))
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// A verifier that accepts all certificates (but logs an error still)
|
||||
struct AcceptAll(Arc<WebPkiServerVerifier>);
|
||||
impl ServerCertVerifier for AcceptAll {
|
||||
fn verify_server_cert(
|
||||
&self,
|
||||
end_entity: &rustls::pki_types::CertificateDer<'_>,
|
||||
intermediates: &[rustls::pki_types::CertificateDer<'_>],
|
||||
server_name: &rustls::pki_types::ServerName<'_>,
|
||||
ocsp_response: &[u8],
|
||||
now: rustls::pki_types::UnixTime,
|
||||
) -> Result<ServerCertVerified, rustls::Error> {
|
||||
let r =
|
||||
self.0
|
||||
.verify_server_cert(end_entity, intermediates, server_name, ocsp_response, now);
|
||||
if let Err(err) = r {
|
||||
tracing::info!(
|
||||
?server_name,
|
||||
"ignoring db connection TLS validation error: {err:?}"
|
||||
);
|
||||
return Ok(ServerCertVerified::assertion());
|
||||
}
|
||||
r
|
||||
}
|
||||
fn verify_tls12_signature(
|
||||
&self,
|
||||
message: &[u8],
|
||||
cert: &rustls::pki_types::CertificateDer<'_>,
|
||||
dss: &rustls::DigitallySignedStruct,
|
||||
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
|
||||
self.0.verify_tls12_signature(message, cert, dss)
|
||||
}
|
||||
fn verify_tls13_signature(
|
||||
&self,
|
||||
message: &[u8],
|
||||
cert: &rustls::pki_types::CertificateDer<'_>,
|
||||
dss: &rustls::DigitallySignedStruct,
|
||||
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
|
||||
self.0.verify_tls13_signature(message, cert, dss)
|
||||
}
|
||||
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
|
||||
self.0.supported_verify_schemes()
|
||||
}
|
||||
}
|
||||
|
||||
/// Loads the root certificates and constructs a client config suitable for connecting.
|
||||
/// This function is blocking.
|
||||
fn client_config_with_root_certs() -> anyhow::Result<rustls::ClientConfig> {
|
||||
@@ -1337,12 +1290,76 @@ fn client_config_with_root_certs() -> anyhow::Result<rustls::ClientConfig> {
|
||||
.expect("ring should support the default protocol versions");
|
||||
static DO_CERT_CHECKS: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
|
||||
let do_cert_checks =
|
||||
DO_CERT_CHECKS.get_or_init(|| std::env::var("STORCON_DB_CERT_CHECKS").is_ok());
|
||||
DO_CERT_CHECKS.get_or_init(|| std::env::var("STORCON_CERT_CHECKS").is_ok());
|
||||
Ok(if *do_cert_checks {
|
||||
client_config
|
||||
.with_root_certificates(load_certs()?)
|
||||
.with_no_client_auth()
|
||||
} else {
|
||||
use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified};
|
||||
#[derive(Debug)]
|
||||
struct AcceptAll(Arc<WebPkiServerVerifier>);
|
||||
impl ServerCertVerifier for AcceptAll {
|
||||
fn verify_server_cert(
|
||||
&self,
|
||||
end_entity: &rustls::pki_types::CertificateDer<'_>,
|
||||
intermediates: &[rustls::pki_types::CertificateDer<'_>],
|
||||
server_name: &rustls::pki_types::ServerName<'_>,
|
||||
ocsp_response: &[u8],
|
||||
now: rustls::pki_types::UnixTime,
|
||||
) -> Result<ServerCertVerified, rustls::Error> {
|
||||
let r = self.0.verify_server_cert(
|
||||
end_entity,
|
||||
intermediates,
|
||||
server_name,
|
||||
ocsp_response,
|
||||
now,
|
||||
);
|
||||
if let Err(err) = r {
|
||||
tracing::info!(
|
||||
?server_name,
|
||||
"ignoring db connection TLS validation error: {err:?}"
|
||||
);
|
||||
return Ok(ServerCertVerified::assertion());
|
||||
}
|
||||
r
|
||||
}
|
||||
fn verify_tls12_signature(
|
||||
&self,
|
||||
message: &[u8],
|
||||
cert: &rustls::pki_types::CertificateDer<'_>,
|
||||
dss: &rustls::DigitallySignedStruct,
|
||||
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error>
|
||||
{
|
||||
let r = self.0.verify_tls12_signature(message, cert, dss);
|
||||
if let Err(err) = r {
|
||||
tracing::info!(
|
||||
"ignoring db connection 1.2 signature TLS validation error: {err:?}"
|
||||
);
|
||||
return Ok(HandshakeSignatureValid::assertion());
|
||||
}
|
||||
r
|
||||
}
|
||||
fn verify_tls13_signature(
|
||||
&self,
|
||||
message: &[u8],
|
||||
cert: &rustls::pki_types::CertificateDer<'_>,
|
||||
dss: &rustls::DigitallySignedStruct,
|
||||
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error>
|
||||
{
|
||||
let r = self.0.verify_tls13_signature(message, cert, dss);
|
||||
if let Err(err) = r {
|
||||
tracing::info!(
|
||||
"ignoring db connection 1.3 signature TLS validation error: {err:?}"
|
||||
);
|
||||
return Ok(HandshakeSignatureValid::assertion());
|
||||
}
|
||||
r
|
||||
}
|
||||
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
|
||||
self.0.supported_verify_schemes()
|
||||
}
|
||||
}
|
||||
let verifier = AcceptAll(
|
||||
WebPkiServerVerifier::builder_with_provider(
|
||||
load_certs()?,
|
||||
@@ -1372,29 +1389,6 @@ fn establish_connection_rustls(config: &str) -> BoxFuture<ConnectionResult<Async
|
||||
fut.boxed()
|
||||
}
|
||||
|
||||
#[cfg_attr(test, test)]
|
||||
fn test_config_debug_censors_password() {
|
||||
let has_pw =
|
||||
"host=/var/lib/postgresql,localhost port=1234 user=specialuser password='NOT ALLOWED TAG'";
|
||||
let has_pw_cfg = has_pw.parse::<tokio_postgres::Config>().unwrap();
|
||||
assert!(format!("{has_pw_cfg:?}").contains("specialuser"));
|
||||
// Ensure that the password is not leaked by the debug impl
|
||||
assert!(!format!("{has_pw_cfg:?}").contains("NOT ALLOWED TAG"));
|
||||
}
|
||||
|
||||
fn log_postgres_connstr_info(config_str: &str) -> anyhow::Result<()> {
|
||||
let config = config_str
|
||||
.parse::<tokio_postgres::Config>()
|
||||
.map_err(|_e| anyhow::anyhow!("Couldn't parse config str"))?;
|
||||
// We use debug formatting here, and use a unit test to ensure that we don't leak the password.
|
||||
// To make extra sure the test gets ran, run it every time the function is called
|
||||
// (this is rather cold code, we can afford it).
|
||||
#[cfg(not(test))]
|
||||
test_config_debug_censors_password();
|
||||
tracing::info!("database connection config: {config:?}");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
|
||||
#[derive(
|
||||
QueryableByName, Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq,
|
||||
|
||||
@@ -115,15 +115,6 @@ impl ReconcilerConfigBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn tenant_creation_hint(self, hint: bool) -> Self {
|
||||
Self {
|
||||
config: ReconcilerConfig {
|
||||
tenant_creation_hint: hint,
|
||||
..self.config
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn build(self) -> ReconcilerConfig {
|
||||
self.config
|
||||
}
|
||||
@@ -138,10 +129,6 @@ pub(crate) struct ReconcilerConfig {
|
||||
// During live migrations this is the amount of time that
|
||||
// the pagserver will hold our poll.
|
||||
secondary_download_request_timeout: Option<Duration>,
|
||||
|
||||
// A hint indicating whether this reconciliation is done on the
|
||||
// creation of a new tenant. This only informs logging behaviour.
|
||||
tenant_creation_hint: bool,
|
||||
}
|
||||
|
||||
impl ReconcilerConfig {
|
||||
@@ -156,10 +143,6 @@ impl ReconcilerConfig {
|
||||
self.secondary_download_request_timeout
|
||||
.unwrap_or(SECONDARY_DOWNLOAD_REQUEST_TIMEOUT_DEFAULT)
|
||||
}
|
||||
|
||||
pub(crate) fn tenant_creation_hint(&self) -> bool {
|
||||
self.tenant_creation_hint
|
||||
}
|
||||
}
|
||||
|
||||
/// RAII resource units granted to a Reconciler, which it should keep alive until it finishes doing I/O
|
||||
@@ -951,35 +934,16 @@ impl Reconciler {
|
||||
)
|
||||
.await;
|
||||
if let Err(e) = &result {
|
||||
// Set this flag so that in our ReconcileResult we will set the flag on the shard that it
|
||||
// needs to retry at some point.
|
||||
self.compute_notify_failure = true;
|
||||
|
||||
// It is up to the caller whether they want to drop out on this error, but they don't have to:
|
||||
// in general we should avoid letting unavailability of the cloud control plane stop us from
|
||||
// making progress.
|
||||
match e {
|
||||
// 404s from cplane during tenant creation are expected.
|
||||
// Cplane only persists the shards to the database after
|
||||
// creating the tenant and the timeline. If we notify before
|
||||
// that, we'll get a 404.
|
||||
//
|
||||
// This is fine because tenant creations happen via /location_config
|
||||
// and that returns the list of locations in the response. Hence, we
|
||||
// silence the error and return Ok(()) here. Reconciliation will still
|
||||
// be retried because we set [`Reconciler::compute_notify_failure`] above.
|
||||
NotifyError::Unexpected(hyper::StatusCode::NOT_FOUND)
|
||||
if self.reconciler_config.tenant_creation_hint() =>
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
NotifyError::ShuttingDown => {}
|
||||
_ => {
|
||||
tracing::warn!(
|
||||
"Failed to notify compute of attached pageserver {node}: {e}"
|
||||
);
|
||||
}
|
||||
if !matches!(e, NotifyError::ShuttingDown) {
|
||||
tracing::warn!("Failed to notify compute of attached pageserver {node}: {e}");
|
||||
}
|
||||
|
||||
// Set this flag so that in our ReconcileResult we will set the flag on the shard that it
|
||||
// needs to retry at some point.
|
||||
self.compute_notify_failure = true;
|
||||
}
|
||||
result
|
||||
} else {
|
||||
|
||||
@@ -2238,14 +2238,9 @@ impl Service {
|
||||
let waiters = {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, _scheduler) = locked.parts_mut();
|
||||
let config = ReconcilerConfigBuilder::new()
|
||||
.tenant_creation_hint(true)
|
||||
.build();
|
||||
tenants
|
||||
.range_mut(TenantShardId::tenant_range(tenant_id))
|
||||
.filter_map(|(_shard_id, shard)| {
|
||||
self.maybe_configured_reconcile_shard(shard, nodes, config)
|
||||
})
|
||||
.filter_map(|(_shard_id, shard)| self.maybe_reconcile_shard(shard, nodes))
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
|
||||
@@ -707,7 +707,6 @@ impl TenantShard {
|
||||
if let Some(node_id) = self.intent.get_attached() {
|
||||
// Populate secondary by demoting the attached node
|
||||
self.intent.demote_attached(scheduler, *node_id);
|
||||
|
||||
modified = true;
|
||||
} else if self.intent.secondary.is_empty() {
|
||||
// Populate secondary by scheduling a fresh node
|
||||
@@ -980,51 +979,24 @@ impl TenantShard {
|
||||
),
|
||||
)
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if secondary_scores.iter().any(|score| score.1.is_none()) {
|
||||
// Trivial case: if we only have one secondary, drop that one
|
||||
if self.intent.get_secondary().len() == 1 {
|
||||
return Some(ScheduleOptimization {
|
||||
sequence: self.sequence,
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(
|
||||
*self.intent.get_secondary().first().unwrap(),
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
// Try to find a "good" secondary to keep, without relying on scores (one or more nodes is in a state
|
||||
// where its score can't be calculated), and drop the others. This enables us to make progress in
|
||||
// most cases, even if some nodes are offline or have scheduling=pause set.
|
||||
|
||||
debug_assert!(self.intent.attached.is_some()); // We should not make it here unless attached -- this
|
||||
// logic presumes we are in a mode where we want secondaries to be in non-home AZ
|
||||
if let Some(retain_secondary) = self.intent.get_secondary().iter().find(|n| {
|
||||
let in_home_az = scheduler.get_node_az(n) == self.intent.preferred_az_id;
|
||||
let is_available = secondary_scores
|
||||
.get(n)
|
||||
.expect("Built from same list of nodes")
|
||||
.is_some();
|
||||
is_available && !in_home_az
|
||||
}) {
|
||||
// Great, we found one to retain. Pick some other to drop.
|
||||
if let Some(victim) = self
|
||||
.intent
|
||||
.get_secondary()
|
||||
.iter()
|
||||
.find(|n| n != &retain_secondary)
|
||||
{
|
||||
// Don't have full list of scores, so can't make a good decision about which to drop unless
|
||||
// there is an obvious one in the wrong AZ
|
||||
for secondary in self.intent.get_secondary() {
|
||||
if scheduler.get_node_az(secondary) == self.intent.preferred_az_id {
|
||||
return Some(ScheduleOptimization {
|
||||
sequence: self.sequence,
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(*victim),
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(*secondary),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Fall through: we didn't identify one to remove. This ought to be rare.
|
||||
tracing::warn!("Keeping extra secondaries: can't determine which of {:?} to remove (some nodes offline?)",
|
||||
self.intent.get_secondary()
|
||||
);
|
||||
self.intent.get_secondary()
|
||||
);
|
||||
} else {
|
||||
let victim = secondary_scores
|
||||
.iter()
|
||||
@@ -1033,7 +1005,7 @@ impl TenantShard {
|
||||
.0;
|
||||
return Some(ScheduleOptimization {
|
||||
sequence: self.sequence,
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(*victim),
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(victim),
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -2407,110 +2379,6 @@ pub(crate) mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Test how the optimisation code behaves with an extra secondary
|
||||
#[test]
|
||||
fn optimize_removes_secondary() -> anyhow::Result<()> {
|
||||
let az_a_tag = AvailabilityZone("az-a".to_string());
|
||||
let az_b_tag = AvailabilityZone("az-b".to_string());
|
||||
let mut nodes = make_test_nodes(
|
||||
4,
|
||||
&[
|
||||
az_a_tag.clone(),
|
||||
az_b_tag.clone(),
|
||||
az_a_tag.clone(),
|
||||
az_b_tag.clone(),
|
||||
],
|
||||
);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
|
||||
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
shard_a.intent.preferred_az_id = Some(az_a_tag.clone());
|
||||
shard_a
|
||||
.schedule(&mut scheduler, &mut schedule_context)
|
||||
.unwrap();
|
||||
|
||||
// Attached on node 1, secondary on node 2
|
||||
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
|
||||
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(2)]);
|
||||
|
||||
// Initially optimiser is idle
|
||||
assert_eq!(
|
||||
shard_a.optimize_attachment(&mut scheduler, &schedule_context),
|
||||
None
|
||||
);
|
||||
assert_eq!(
|
||||
shard_a.optimize_secondary(&mut scheduler, &schedule_context),
|
||||
None
|
||||
);
|
||||
|
||||
// A spare secondary in the home AZ: it should be removed -- this is the situation when we're midway through a graceful migration, after cutting over
|
||||
// to our new location
|
||||
shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
|
||||
let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
||||
assert_eq!(
|
||||
optimization,
|
||||
Some(ScheduleOptimization {
|
||||
sequence: shard_a.sequence,
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
|
||||
})
|
||||
);
|
||||
shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
|
||||
|
||||
// A spare secondary in the non-home AZ, and one of them is offline
|
||||
shard_a.intent.push_secondary(&mut scheduler, NodeId(4));
|
||||
nodes
|
||||
.get_mut(&NodeId(4))
|
||||
.unwrap()
|
||||
.set_availability(NodeAvailability::Offline);
|
||||
scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
|
||||
let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
||||
assert_eq!(
|
||||
optimization,
|
||||
Some(ScheduleOptimization {
|
||||
sequence: shard_a.sequence,
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(4))
|
||||
})
|
||||
);
|
||||
shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
|
||||
|
||||
// A spare secondary when should have none
|
||||
shard_a.policy = PlacementPolicy::Attached(0);
|
||||
let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
||||
assert_eq!(
|
||||
optimization,
|
||||
Some(ScheduleOptimization {
|
||||
sequence: shard_a.sequence,
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
|
||||
})
|
||||
);
|
||||
shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
|
||||
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
|
||||
assert_eq!(shard_a.intent.get_secondary(), &vec![]);
|
||||
|
||||
// Check that in secondary mode, we preserve the secondary in the preferred AZ
|
||||
let mut schedule_context = ScheduleContext::default(); // Fresh context, we're about to call schedule()
|
||||
shard_a.policy = PlacementPolicy::Secondary;
|
||||
shard_a
|
||||
.schedule(&mut scheduler, &mut schedule_context)
|
||||
.unwrap();
|
||||
assert_eq!(shard_a.intent.get_attached(), &None);
|
||||
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
|
||||
assert_eq!(
|
||||
shard_a.optimize_attachment(&mut scheduler, &schedule_context),
|
||||
None
|
||||
);
|
||||
assert_eq!(
|
||||
shard_a.optimize_secondary(&mut scheduler, &schedule_context),
|
||||
None
|
||||
);
|
||||
|
||||
shard_a.intent.clear(&mut scheduler);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Optimize til quiescent: this emulates what Service::optimize_all does, when
|
||||
// called repeatedly in the background.
|
||||
// Returns the applied optimizations
|
||||
|
||||
@@ -2105,7 +2105,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
log.info(f"reconcile_all waited for {n} shards")
|
||||
return n
|
||||
|
||||
def reconcile_until_idle(self, timeout_secs=30, max_interval=5):
|
||||
def reconcile_until_idle(self, timeout_secs=30, max_interval=1):
|
||||
start_at = time.time()
|
||||
n = 1
|
||||
delay_sec = 0.1
|
||||
@@ -2766,11 +2766,6 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
log.error(f"Failed to decode LocationConf, raw content ({len(bytes)} bytes): {bytes}")
|
||||
raise
|
||||
|
||||
def heatmap_content(self, tenant_shard_id: TenantId | TenantShardId) -> Any:
|
||||
path = self.tenant_dir(tenant_shard_id) / "heatmap-v1.json"
|
||||
with open(path) as f:
|
||||
return json.load(f)
|
||||
|
||||
def tenant_create(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
|
||||
@@ -34,20 +34,16 @@ def test_layer_map(neon_env_builder: NeonEnvBuilder, zenbenchmark):
|
||||
cur.execute("set log_statement = 'all'")
|
||||
cur.execute("create table t(x integer)")
|
||||
for _ in range(n_iters):
|
||||
with zenbenchmark.record_duration(f"insert into t values (generate_series(1,{n_records}))"):
|
||||
cur.execute(f"insert into t values (generate_series(1,{n_records}))")
|
||||
cur.execute(f"insert into t values (generate_series(1,{n_records}))")
|
||||
time.sleep(1)
|
||||
|
||||
with zenbenchmark.record_duration("vacuum t"):
|
||||
cur.execute("vacuum t")
|
||||
cur.execute("vacuum t")
|
||||
|
||||
with zenbenchmark.record_duration("SELECT count(*) from t"):
|
||||
with zenbenchmark.record_duration("test_query"):
|
||||
cur.execute("SELECT count(*) from t")
|
||||
assert cur.fetchone() == (n_iters * n_records,)
|
||||
|
||||
with zenbenchmark.record_duration("flush_ep_to_pageserver"):
|
||||
flush_ep_to_pageserver(env, endpoint, tenant, timeline)
|
||||
with zenbenchmark.record_duration("timeline_checkpoint"):
|
||||
env.pageserver.http_client().timeline_checkpoint(
|
||||
tenant, timeline, compact=False, wait_until_uploaded=True
|
||||
)
|
||||
flush_ep_to_pageserver(env, endpoint, tenant, timeline)
|
||||
env.pageserver.http_client().timeline_checkpoint(
|
||||
tenant, timeline, compact=False, wait_until_uploaded=True
|
||||
)
|
||||
|
||||
@@ -29,21 +29,6 @@ AGGRESSIVE_COMPACTION_TENANT_CONF = {
|
||||
# "lsn_lease_length": "0s", -- TODO: would cause branch creation errors, should fix later
|
||||
}
|
||||
|
||||
PREEMPT_COMPACTION_TENANT_CONF = {
|
||||
"gc_period": "5s",
|
||||
"compaction_period": "5s",
|
||||
# Small checkpoint distance to create many layers
|
||||
"checkpoint_distance": 1024**2,
|
||||
# Compact small layers
|
||||
"compaction_target_size": 1024**2,
|
||||
"image_creation_threshold": 1,
|
||||
"image_creation_preempt_threshold": 1,
|
||||
# compact more frequently
|
||||
"compaction_threshold": 3,
|
||||
"compaction_upper_limit": 6,
|
||||
"lsn_lease_length": "0s",
|
||||
}
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
@pytest.mark.parametrize(
|
||||
@@ -51,8 +36,7 @@ PREEMPT_COMPACTION_TENANT_CONF = {
|
||||
[PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED],
|
||||
)
|
||||
def test_pageserver_compaction_smoke(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
wal_receiver_protocol: PageserverWalReceiverProtocol,
|
||||
neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: PageserverWalReceiverProtocol
|
||||
):
|
||||
"""
|
||||
This is a smoke test that compaction kicks in. The workload repeatedly churns
|
||||
@@ -70,8 +54,7 @@ def test_pageserver_compaction_smoke(
|
||||
page_cache_size=10
|
||||
"""
|
||||
|
||||
conf = AGGRESSIVE_COMPACTION_TENANT_CONF.copy()
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=conf)
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=AGGRESSIVE_COMPACTION_TENANT_CONF)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
@@ -130,41 +113,6 @@ page_cache_size=10
|
||||
assert vectored_average < 8
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
def test_pageserver_compaction_preempt(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
# Ideally we should be able to do unit tests for this, but we need real Postgres
|
||||
# WALs in order to do unit testing...
|
||||
|
||||
conf = PREEMPT_COMPACTION_TENANT_CONF.copy()
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=conf)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
row_count = 200000
|
||||
churn_rounds = 10
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init(env.pageserver.id)
|
||||
|
||||
log.info("Writing initial data ...")
|
||||
workload.write_rows(row_count, env.pageserver.id)
|
||||
|
||||
for i in range(1, churn_rounds + 1):
|
||||
log.info(f"Running churn round {i}/{churn_rounds} ...")
|
||||
workload.churn_rows(row_count, env.pageserver.id, upload=False)
|
||||
workload.validate(env.pageserver.id)
|
||||
ps_http.timeline_compact(tenant_id, timeline_id, wait_until_uploaded=True)
|
||||
log.info("Validating at workload end ...")
|
||||
workload.validate(env.pageserver.id)
|
||||
# ensure image layer creation gets preempted and then resumed
|
||||
env.pageserver.assert_log_contains("resuming image layer creation")
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
@pytest.mark.parametrize(
|
||||
"with_branches",
|
||||
@@ -302,9 +250,6 @@ def test_pageserver_gc_compaction_idempotent(
|
||||
workload.churn_rows(row_count, env.pageserver.id)
|
||||
# compact 3 times if mode is before_restart
|
||||
n_compactions = 3 if compaction_mode == "before_restart" else 1
|
||||
ps_http.timeline_compact(
|
||||
tenant_id, timeline_id, force_l0_compaction=True, wait_until_uploaded=True
|
||||
)
|
||||
for _ in range(n_compactions):
|
||||
# Force refresh gc info to have gc_cutoff generated
|
||||
ps_http.timeline_gc(tenant_id, timeline_id, None)
|
||||
|
||||
@@ -95,8 +95,6 @@ def test_remote_extensions(
|
||||
|
||||
# mock remote_extensions spec
|
||||
spec: dict[str, Any] = {
|
||||
"public_extensions": ["anon"],
|
||||
"custom_extensions": None,
|
||||
"library_index": {
|
||||
"anon": "anon",
|
||||
},
|
||||
|
||||
@@ -443,7 +443,7 @@ def test_heatmap_uploads(neon_env_builder: NeonEnvBuilder):
|
||||
workload.write_rows(256, env.pageservers[0].id)
|
||||
env.pageserver.http_client().tenant_heatmap_upload(tenant_id)
|
||||
|
||||
def validate_heatmap(heatmap, on_disk_heatmap):
|
||||
def validate_heatmap(heatmap):
|
||||
assert len(heatmap["timelines"]) == 1
|
||||
assert heatmap["timelines"][0]["timeline_id"] == str(timeline_id)
|
||||
assert len(heatmap["timelines"][0]["layers"]) > 0
|
||||
@@ -452,13 +452,10 @@ def test_heatmap_uploads(neon_env_builder: NeonEnvBuilder):
|
||||
# Each layer appears at most once
|
||||
assert len(set(layer["name"] for layer in layers)) == len(layers)
|
||||
|
||||
assert heatmap == on_disk_heatmap
|
||||
|
||||
# Download and inspect the heatmap that the pageserver uploaded
|
||||
heatmap_first = env.pageserver_remote_storage.heatmap_content(tenant_id)
|
||||
heatmap_first_on_disk = env.pageserver.heatmap_content(tenant_id)
|
||||
log.info(f"Read back heatmap: {heatmap_first}")
|
||||
validate_heatmap(heatmap_first, heatmap_first_on_disk)
|
||||
validate_heatmap(heatmap_first)
|
||||
|
||||
# Do some more I/O to generate more layers
|
||||
workload.churn_rows(64, env.pageservers[0].id)
|
||||
@@ -466,10 +463,9 @@ def test_heatmap_uploads(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Ensure that another heatmap upload includes the new layers
|
||||
heatmap_second = env.pageserver_remote_storage.heatmap_content(tenant_id)
|
||||
heatmap_second_on_disk = env.pageserver.heatmap_content(tenant_id)
|
||||
log.info(f"Read back heatmap: {heatmap_second}")
|
||||
assert heatmap_second != heatmap_first
|
||||
validate_heatmap(heatmap_second, heatmap_second_on_disk)
|
||||
validate_heatmap(heatmap_second)
|
||||
|
||||
|
||||
def list_elegible_layers(
|
||||
|
||||
@@ -92,7 +92,6 @@ tonic = { version = "0.12", default-features = false, features = ["codegen", "pr
|
||||
tower = { version = "0.4", default-features = false, features = ["balance", "buffer", "limit", "util"] }
|
||||
tracing = { version = "0.1", features = ["log"] }
|
||||
tracing-core = { version = "0.1" }
|
||||
tracing-log = { version = "0.2" }
|
||||
url = { version = "2", features = ["serde"] }
|
||||
zerocopy = { version = "0.7", features = ["derive", "simd"] }
|
||||
zeroize = { version = "1", features = ["derive", "serde"] }
|
||||
|
||||
Reference in New Issue
Block a user