Compare commits

...

10 Commits

Author SHA1 Message Date
Conrad Ludgate
9a0d3e84d5 apply envfilter only to export layer 2025-02-05 12:11:28 +00:00
Conrad Ludgate
57d51d581d fix env 2025-02-05 11:26:10 +00:00
Conrad Ludgate
47c15899c9 feat: experiment with tokio-console 2025-02-05 10:05:12 +00:00
Fedor Dikarev
472007dd7c ci: unify Dockerfiles, set bash as SHELL for debian layers, make cpan step as separate RUN (#10645)
## Problem
Ref: https://github.com/neondatabase/cloud/issues/23461

and follow-up after: https://github.com/neondatabase/neon/pull/10553

we used `echo` to set-up `.wgetrc` and `.curlrc`, and there we used `\n`
to make these multiline configs with one echo command.

The problem is that Debian `/bin/sh`'s built-in echo command behaves
differently from the `/bin/echo` executable and from the `echo` built-in
in `bash`. Namely, it does not support the`-e` option, and while it does
treat `\n` as a newline, passing `-e` here will add that `-e` to the
output.
At the same time, when we use different base images, for example
`alpine/curl`, their `/bin/sh` supports and requires `-e` for treating
escape sequences like `\n`.
But having different `echo` and remembering difference in their
behaviour isn't best experience for the developer and makes bad
experience maintaining Dockerfiles.

Work-arounds:

- Explicitly use `/bin/bash` (like in this PR)
- Use `/bin/echo` instead of the shell's built-in echo function
- Use printf "foo\n" instead of echo -e "foo\n"

## Summary of changes
1. To fix that, we process with the option setting `/bin/bash` as a
SHELL for the debian-baysed layers
2. With no changes for `alpine/curl` based layers.
3. And one more change here: in `extensions` layer split to the 2 steps:
installing dependencies from `CPAN` and installing `lcov` from github,
so upgrading `lcov` could reuse previous layer with installed cpan
modules.
2025-02-04 18:58:02 +00:00
Vlad Lazar
f9009d6b80 pageserver: write heatmap to disk after uploading it (#10650)
## Problem

We wish to make heatmap generation additive in
https://github.com/neondatabase/neon/pull/10597.
However, if the pageserver restarts and has a heatmap on disk from when
it was a secondary long ago,
we can end up keeping extra layers on the secondary's disk.

## Summary of changes

Persist the heatmap after a successful upload.
2025-02-04 17:52:54 +00:00
Alex Chi Z.
cab60b6d9f fix(pagesever): stablize gc-compaction tests (#10621)
## Problem

Hopefully this can resolve
https://github.com/neondatabase/neon/issues/10517. The reason why the
test is flaky is that after restart the compute node might write some
data so that the pageserver flush some layers, and in the end, causing
L0 compaction to run, and we cannot get the test scenario as we want.

## Summary of changes

Ensure all L0 layers are compacted before starting the test.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-02-04 16:11:31 +00:00
Erik Grinaker
06090bbccd pageserver: log critical error on ClearVmBits for unknown pages (#10634)
## Problem

In #9895, we fixed some issues where `ClearVmBits` were broadcast to all
shards, even those not owning the VM relation. As part of that, we found
some ancient code from #1417, which discarded spurious incorrect
`ClearVmBits` records for pages outside of the VM relation. We added
observability in #9911 to see how often this actually happens in the
wild.

After two months, we have not seen this happen once in production or
staging. However, out of caution, we don't want a hard error and break
WAL ingestion.

Resolves #10067.

## Summary of changes

Log a critical error when ingesting `ClearVmBits` for unknown VM
relations or pages.
2025-02-04 14:55:11 +00:00
Folke Behrens
dcf335a251 proxy: Switch proxy to JSON logging (#9857)
## Problem

We want to switch proxy and ideally all Rust services to structured JSON
logging to support better filtering and cross-referencing with tracing.

## Summary of changes

* Introduce a custom tracing-subscriber to write the JSON. In a first
attempt a customized tracing::fmt::FmtSubscriber was used, but it's very
inefficient and can still generate invalid JSON. It's also doesn't allow
us to add important fields to the root object.
* Make this opt in: the `LOGFMT` env var can be set to `"json"` to
enable to new logger at startup.
2025-02-04 14:50:53 +00:00
Arpad Müller
b6e9daea9a storcon: only allow errrors of the server cert verification (#10644)
This PR does a bunch of things:

* only allow errors of the server cert verification, not of the TLS
handshake. The TLS handshake doesn't cause any errors for us so we can
just always require it to be valid. This simplifies the code a little.
* As the solution is more permanent than originally anticipated, I think
it makes sense to move the `AcceptAll` verifier outside.
* log the connstr information. this helps with figuring out which domain
names are configured in the connstr, etc. I think it is generally useful
to print it. make extra sure that the password is not leaked.

Follow-up of #10640
2025-02-04 14:01:57 +00:00
a-masterov
d5c3a4e2b9 Add support for pgjwt test (#10611)
## Problem
We don't currently test pgjwt, while it is based on pg_prove and can be
easily added
## Summary of changes
The test for pgjwt was added.
2025-02-04 13:49:44 +00:00
22 changed files with 1360 additions and 185 deletions

View File

@@ -3,15 +3,18 @@
# by the RUSTDOCFLAGS env var in CI.
rustdocflags = ["-Arustdoc::private_intra_doc_links"]
# Enable frame pointers. This may have a minor performance overhead, but makes it easier and more
# efficient to obtain stack traces (and thus CPU/heap profiles). It may also avoid seg faults that
# we've seen with libunwind-based profiling. See also:
#
# * <https://www.brendangregg.com/blog/2024-03-17/the-return-of-the-frame-pointers.html>
# * <https://github.com/rust-lang/rust/pull/122646>
#
# NB: the RUSTFLAGS envvar will replace this. Make sure to update e.g. Dockerfile as well.
rustflags = ["-Cforce-frame-pointers=yes"]
rustflags = [
# Enable frame pointers. This may have a minor performance overhead, but makes it easier and more
# efficient to obtain stack traces (and thus CPU/heap profiles). It may also avoid seg faults that
# we've seen with libunwind-based profiling. See also:
#
# * <https://www.brendangregg.com/blog/2024-03-17/the-return-of-the-frame-pointers.html>
# * <https://github.com/rust-lang/rust/pull/122646>
"-Cforce-frame-pointers=yes",
# Enable tokio_unstable to enable tokio-console support
"--cfg=tokio_unstable"
]
[alias]
build_testing = ["build", "--features", "testing"]

208
Cargo.lock generated
View File

@@ -206,6 +206,16 @@ dependencies = [
"syn 2.0.90",
]
[[package]]
name = "assert-json-diff"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12"
dependencies = [
"serde",
"serde_json",
]
[[package]]
name = "async-channel"
version = "1.9.0"
@@ -707,13 +717,40 @@ dependencies = [
"tracing",
]
[[package]]
name = "axum"
version = "0.7.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
dependencies = [
"async-trait",
"axum-core 0.4.5",
"bytes",
"futures-util",
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"itoa",
"matchit 0.7.3",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"sync_wrapper 1.0.1",
"tower 0.5.2",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d6fd624c75e18b3b4c6b9caf42b1afe24437daaee904069137d8bab077be8b8"
dependencies = [
"axum-core",
"axum-core 0.5.0",
"base64 0.22.1",
"bytes",
"form_urlencoded",
@@ -724,7 +761,7 @@ dependencies = [
"hyper 1.4.1",
"hyper-util",
"itoa",
"matchit",
"matchit 0.8.4",
"memchr",
"mime",
"percent-encoding",
@@ -744,6 +781,26 @@ dependencies = [
"tracing",
]
[[package]]
name = "axum-core"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"mime",
"pin-project-lite",
"rustversion",
"sync_wrapper 1.0.1",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.5.0"
@@ -1010,6 +1067,12 @@ dependencies = [
"generic-array",
]
[[package]]
name = "boxcar"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2721c3c5a6f0e7f7e607125d963fedeb765f545f67adc9d71ed934693881eb42"
[[package]]
name = "bstr"
version = "1.5.0"
@@ -1283,7 +1346,7 @@ dependencies = [
"aws-config",
"aws-sdk-kms",
"aws-sdk-s3",
"axum",
"axum 0.8.1",
"base64 0.13.1",
"bytes",
"camino",
@@ -1321,7 +1384,7 @@ dependencies = [
"tokio-stream",
"tokio-util",
"tower 0.5.2",
"tower-http",
"tower-http 0.6.2",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
@@ -1343,6 +1406,47 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "console-api"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8030735ecb0d128428b64cd379809817e620a40e5001c54465b99ec5feec2857"
dependencies = [
"futures-core",
"prost",
"prost-types",
"tonic",
"tracing-core",
]
[[package]]
name = "console-subscriber"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6539aa9c6a4cd31f4b1c040f860a1eac9aa80e7df6b05d506a6e7179936d6a01"
dependencies = [
"console-api",
"crossbeam-channel",
"crossbeam-utils",
"futures-task",
"hdrhistogram",
"humantime",
"hyper-util",
"parking_lot 0.12.1",
"prost",
"prost-types",
"serde",
"serde_json",
"thread_local",
"tokio",
"tokio-stream",
"tonic",
"tonic-web",
"tracing",
"tracing-core",
"tracing-subscriber",
]
[[package]]
name = "const-oid"
version = "0.9.6"
@@ -2433,6 +2537,16 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "gettid"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "397256552fed4a9e577850498071831ec8f18ea83368aecc114cab469dcb43e5"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "gimli"
version = "0.31.1"
@@ -3409,6 +3523,12 @@ dependencies = [
"regex-automata 0.1.10",
]
[[package]]
name = "matchit"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "matchit"
version = "0.8.4"
@@ -4212,6 +4332,16 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "papaya"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc7c76487f7eaa00a0fc1d7f88dc6b295aec478d11b0fc79f857b62c2874124c"
dependencies = [
"equivalent",
"seize",
]
[[package]]
name = "parking"
version = "2.1.1"
@@ -4839,6 +4969,7 @@ dependencies = [
"ahash",
"anyhow",
"arc-swap",
"assert-json-diff",
"async-compression",
"async-trait",
"atomic-take",
@@ -4846,6 +4977,7 @@ dependencies = [
"aws-sdk-iam",
"aws-sigv4",
"base64 0.13.1",
"boxcar",
"bstr",
"bytes",
"camino",
@@ -4854,6 +4986,7 @@ dependencies = [
"clap",
"clashmap",
"compute_api",
"console-subscriber",
"consumption_metrics",
"ecdsa 0.16.9",
"ed25519-dalek",
@@ -4862,6 +4995,7 @@ dependencies = [
"flate2",
"framed-websockets",
"futures",
"gettid",
"hashbrown 0.14.5",
"hashlink",
"hex",
@@ -4884,7 +5018,9 @@ dependencies = [
"measured",
"metrics",
"once_cell",
"opentelemetry",
"p256 0.13.2",
"papaya",
"parking_lot 0.12.1",
"parquet",
"parquet_derive",
@@ -4931,6 +5067,9 @@ dependencies = [
"tokio-tungstenite 0.21.0",
"tokio-util",
"tracing",
"tracing-log",
"tracing-opentelemetry",
"tracing-serde",
"tracing-subscriber",
"tracing-utils",
"try-lock",
@@ -5360,7 +5499,7 @@ dependencies = [
"async-trait",
"getrandom 0.2.11",
"http 1.1.0",
"matchit",
"matchit 0.8.4",
"opentelemetry",
"reqwest",
"reqwest-middleware",
@@ -5884,6 +6023,16 @@ dependencies = [
"libc",
]
[[package]]
name = "seize"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d84b0c858bdd30cb56f5597f8b3bf702ec23829e652cc636a1e5a7b9de46ae93"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "semver"
version = "1.0.17"
@@ -6839,6 +6988,7 @@ dependencies = [
"signal-hook-registry",
"socket2",
"tokio-macros",
"tracing",
"windows-sys 0.52.0",
]
@@ -7078,9 +7228,12 @@ version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52"
dependencies = [
"async-stream",
"async-trait",
"axum 0.7.9",
"base64 0.22.1",
"bytes",
"h2 0.4.4",
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
@@ -7092,6 +7245,7 @@ dependencies = [
"prost",
"rustls-native-certs 0.8.0",
"rustls-pemfile 2.1.1",
"socket2",
"tokio",
"tokio-rustls 0.26.0",
"tokio-stream",
@@ -7115,6 +7269,26 @@ dependencies = [
"syn 2.0.90",
]
[[package]]
name = "tonic-web"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5299dd20801ad736dccb4a5ea0da7376e59cd98f213bf1c3d478cf53f4834b58"
dependencies = [
"base64 0.22.1",
"bytes",
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"pin-project",
"tokio-stream",
"tonic",
"tower-http 0.5.2",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower"
version = "0.4.13"
@@ -7151,6 +7325,22 @@ dependencies = [
"tracing",
]
[[package]]
name = "tower-http"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5"
dependencies = [
"bitflags 2.8.0",
"bytes",
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"pin-project-lite",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower-http"
version = "0.6.2"
@@ -7281,6 +7471,7 @@ checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
dependencies = [
"matchers",
"once_cell",
"parking_lot 0.12.1",
"regex",
"serde",
"serde_json",
@@ -7583,7 +7774,7 @@ name = "vm_monitor"
version = "0.1.0"
dependencies = [
"anyhow",
"axum",
"axum 0.8.1",
"cgroups-rs",
"clap",
"futures",
@@ -8072,6 +8263,7 @@ dependencies = [
"chrono",
"clap",
"clap_builder",
"crossbeam-utils",
"crypto-bigint 0.5.5",
"der 0.7.8",
"deranged",
@@ -8088,6 +8280,7 @@ dependencies = [
"getrandom 0.2.11",
"half",
"hashbrown 0.14.5",
"hdrhistogram",
"hex",
"hmac",
"hyper 0.14.30",
@@ -8143,8 +8336,11 @@ dependencies = [
"toml_edit",
"tonic",
"tower 0.4.13",
"tower 0.5.2",
"tracing",
"tracing-core",
"tracing-log",
"tracing-subscriber",
"url",
"zerocopy",
"zeroize",

View File

@@ -54,6 +54,7 @@ async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
atomic-take = "1.1.0"
backtrace = "0.3.74"
flate2 = "1.0.26"
assert-json-diff = "2"
async-stream = "0.3"
async-trait = "0.1"
aws-config = { version = "1.5", default-features = false, features=["rustls", "sso"] }
@@ -79,6 +80,7 @@ chrono = { version = "0.4", default-features = false, features = ["clock"] }
clap = { version = "4.0", features = ["derive", "env"] }
clashmap = { version = "1.0", features = ["raw-api"] }
comfy-table = "7.1"
console-subscriber = { version = "0.4.1", features = ["parking_lot", "grpc-web"] }
const_format = "0.2"
crc32c = "0.6"
diatomic-waker = { version = "0.2.3" }
@@ -193,7 +195,9 @@ tower-http = { version = "0.6.2", features = ["request-id", "trace"] }
tower-service = "0.3.3"
tracing = "0.1"
tracing-error = "0.2"
tracing-log = "0.2"
tracing-opentelemetry = "0.28"
tracing-serde = "0.2.0"
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
try-lock = "0.2.5"
twox-hash = { version = "1.6.3", default-features = false }

View File

@@ -45,7 +45,7 @@ COPY --chown=nonroot . .
ARG ADDITIONAL_RUSTFLAGS
RUN set -e \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes --cfg=tokio_unstable ${ADDITIONAL_RUSTFLAGS}" cargo build \
--bin pg_sni_router \
--bin pageserver \
--bin pagectl \

View File

@@ -3,8 +3,13 @@ ARG DEBIAN_VERSION=bookworm
FROM debian:bookworm-slim AS pgcopydb_builder
ARG DEBIAN_VERSION
# Use strict mode for bash to catch errors early
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
# By default, /bin/sh used in debian images will treat '\n' as eol,
# but as we use bash as SHELL, and built-in echo in bash requires '-e' flag for that.
RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc \
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc && \
echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc
RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
@@ -55,7 +60,8 @@ ARG DEBIAN_VERSION
# Add nonroot user
RUN useradd -ms /bin/bash nonroot -b /home
SHELL ["/bin/bash", "-c"]
# Use strict mode for bash to catch errors early
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
RUN mkdir -p /pgcopydb/bin && \
mkdir -p /pgcopydb/lib && \
@@ -66,7 +72,7 @@ COPY --from=pgcopydb_builder /usr/lib/postgresql/16/bin/pgcopydb /pgcopydb/bin/p
COPY --from=pgcopydb_builder /pgcopydb/lib/libpq.so.5 /pgcopydb/lib/libpq.so.5
RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc \
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc && \
echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc
# System deps
@@ -190,8 +196,14 @@ RUN set -e \
# It includes several bug fixes on top on v2.0 release (https://github.com/linux-test-project/lcov/compare/v2.0...master)
# And patches from us:
# - Generates json file with code coverage summary (https://github.com/neondatabase/lcov/commit/426e7e7a22f669da54278e9b55e6d8caabd00af0.tar.gz)
RUN for package in Capture::Tiny DateTime Devel::Cover Digest::MD5 File::Spec JSON::XS Memory::Process Time::HiRes JSON; do yes | perl -MCPAN -e "CPAN::Shell->notest('install', '$package')"; done \
&& wget https://github.com/neondatabase/lcov/archive/426e7e7a22f669da54278e9b55e6d8caabd00af0.tar.gz -O lcov.tar.gz \
RUN set +o pipefail && \
for package in Capture::Tiny DateTime Devel::Cover Digest::MD5 File::Spec JSON::XS Memory::Process Time::HiRes JSON; do \
yes | perl -MCPAN -e "CPAN::Shell->notest('install', '$package')";\
done && \
set -o pipefail
# Split into separate step to debug flaky failures here
RUN wget https://github.com/neondatabase/lcov/archive/426e7e7a22f669da54278e9b55e6d8caabd00af0.tar.gz -O lcov.tar.gz \
&& ls -laht lcov.tar.gz && sha256sum lcov.tar.gz \
&& echo "61a22a62e20908b8b9e27d890bd0ea31f567a7b9668065589266371dcbca0992 lcov.tar.gz" | sha256sum --check \
&& mkdir -p lcov && tar -xzf lcov.tar.gz -C lcov --strip-components=1 \
&& cd lcov \

View File

@@ -96,8 +96,10 @@ ARG DEBIAN_VERSION
# Use strict mode for bash to catch errors early
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
# By default, /bin/sh used in debian images will treat '\n' as eol,
# but as we use bash as SHELL, and built-in echo in bash requires '-e' flag for that.
RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc \
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc && \
echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc
RUN case $DEBIAN_VERSION in \
@@ -1068,6 +1070,7 @@ ENV PATH="/home/nonroot/.cargo/bin:$PATH"
USER nonroot
WORKDIR /home/nonroot
# See comment on the top of the file regading `echo` and `\n`
RUN echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /home/nonroot/.curlrc
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && \
@@ -1584,6 +1587,7 @@ FROM alpine/curl:${ALPINE_CURL_VERSION} AS exporters
ARG TARGETARCH
# Keep sql_exporter version same as in build-tools.Dockerfile and
# test_runner/regress/test_compute_metrics.py
# See comment on the top of the file regading `echo`, `-e` and `\n`
RUN echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc; \
if [ "$TARGETARCH" = "amd64" ]; then\
postgres_exporter_sha256='027e75dda7af621237ff8f5ac66b78a40b0093595f06768612b92b1374bd3105';\
@@ -1700,6 +1704,10 @@ ENV PGDATABASE=postgres
#########################################################################################
FROM debian:$DEBIAN_FLAVOR
ARG DEBIAN_VERSION
# Use strict mode for bash to catch errors early
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
# Add user postgres
RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
echo "postgres:test_console_pass" | chpasswd && \

View File

@@ -32,6 +32,7 @@ reason = "the marvin attack only affects private key decryption, not public key
# https://embarkstudios.github.io/cargo-deny/checks/licenses/cfg.html
[licenses]
allow = [
"0BSD",
"Apache-2.0",
"BSD-2-Clause",
"BSD-3-Clause",

View File

@@ -52,6 +52,7 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
if [ $pg_version -ge 16 ]; then
docker cp ext-src $TEST_CONTAINER_NAME:/
docker exec $TEST_CONTAINER_NAME bash -c "apt update && apt install -y libtap-parser-sourcehandler-pgtap-perl"
# This is required for the pg_hint_plan test, to prevent flaky log message causing the test to fail
# It cannot be moved to Dockerfile now because the database directory is created after the start of the container
echo Adding dummy config

View File

@@ -0,0 +1,4 @@
#!/bin/bash
set -ex
cd "$(dirname "${0}")"
pg_prove test.sql

View File

@@ -0,0 +1,15 @@
diff --git a/test.sql b/test.sql
index d7a0ca8..f15bc76 100644
--- a/test.sql
+++ b/test.sql
@@ -9,9 +9,7 @@
\set ON_ERROR_STOP true
\set QUIET 1
-CREATE EXTENSION pgcrypto;
-CREATE EXTENSION pgtap;
-CREATE EXTENSION pgjwt;
+CREATE EXTENSION IF NOT EXISTS pgtap;
BEGIN;
SELECT plan(23);

View File

@@ -0,0 +1,5 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
patch -p1 <test-upgrade.patch
pg_prove test.sql

View File

@@ -24,7 +24,7 @@ function wait_for_ready {
}
function create_extensions() {
for ext in ${1}; do
docker compose exec neon-test-extensions psql -X -v ON_ERROR_STOP=1 -d contrib_regression -c "CREATE EXTENSION IF NOT EXISTS ${ext}"
docker compose exec neon-test-extensions psql -X -v ON_ERROR_STOP=1 -d contrib_regression -c "CREATE EXTENSION IF NOT EXISTS ${ext} CASCADE"
done
}
EXTENSIONS='[
@@ -40,7 +40,8 @@ EXTENSIONS='[
{"extname": "pg_uuidv7", "extdir": "pg_uuidv7-src"},
{"extname": "roaringbitmap", "extdir": "pg_roaringbitmap-src"},
{"extname": "semver", "extdir": "pg_semver-src"},
{"extname": "pg_ivm", "extdir": "pg_ivm-src"}
{"extname": "pg_ivm", "extdir": "pg_ivm-src"},
{"extname": "pgjwt", "extdir": "pgjwt-src"}
]'
EXTNAMES=$(echo ${EXTENSIONS} | jq -r '.[].extname' | paste -sd ' ' -)
TAG=${NEWTAG} docker compose --profile test-extensions up --quiet-pull --build -d

View File

@@ -2379,7 +2379,6 @@ pub(crate) struct WalIngestMetrics {
pub(crate) records_committed: IntCounter,
pub(crate) records_filtered: IntCounter,
pub(crate) gap_blocks_zeroed_on_rel_extend: IntCounter,
pub(crate) clear_vm_bits_unknown: IntCounterVec,
}
pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| {
@@ -2414,12 +2413,6 @@ pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| {
"Total number of zero gap blocks written on relation extends"
)
.expect("failed to define a metric"),
clear_vm_bits_unknown: register_int_counter_vec!(
"pageserver_wal_ingest_clear_vm_bits_unknown",
"Number of ignored ClearVmBits operations due to unknown pages/relations",
&["entity"],
)
.expect("failed to define a metric"),
}
});

View File

@@ -9,13 +9,14 @@ use crate::{
metrics::SECONDARY_MODE,
tenant::{
config::AttachmentMode,
mgr::GetTenantError,
mgr::TenantManager,
mgr::{GetTenantError, TenantManager},
remote_timeline_client::remote_heatmap_path,
span::debug_assert_current_span_has_tenant_id,
tasks::{warn_when_period_overrun, BackgroundLoopKind},
Tenant,
},
virtual_file::VirtualFile,
TEMP_FILE_SUFFIX,
};
use futures::Future;
@@ -32,7 +33,10 @@ use super::{
};
use tokio_util::sync::CancellationToken;
use tracing::{info_span, instrument, Instrument};
use utils::{backoff, completion::Barrier, yielding_loop::yielding_loop};
use utils::{
backoff, completion::Barrier, crashsafe::path_with_suffix_extension,
yielding_loop::yielding_loop,
};
pub(super) async fn heatmap_uploader_task(
tenant_manager: Arc<TenantManager>,
@@ -461,6 +465,18 @@ async fn upload_tenant_heatmap(
}
}
// After a successful upload persist the fresh heatmap to disk.
// When restarting, the tenant will read the heatmap from disk
// and additively generate a new heatmap (see [`Timeline::generate_heatmap`]).
// If the heatmap is stale, the additive generation can lead to keeping previously
// evicted timelines on the secondarie's disk.
let tenant_shard_id = tenant.get_tenant_shard_id();
let heatmap_path = tenant.conf.tenant_heatmap_path(tenant_shard_id);
let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
if let Err(err) = VirtualFile::crashsafe_overwrite(heatmap_path, temp_path, bytes).await {
tracing::warn!("Non fatal IO error writing to disk after heatmap upload: {err}");
}
tracing::info!("Successfully uploaded {size} byte heatmap to {path}");
Ok(UploadHeatmapOutcome::Uploaded(LastUploadState {

View File

@@ -28,17 +28,9 @@ use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::fsm_logical_to_physical;
use postgres_ffi::walrecord::*;
use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
use wal_decoder::models::*;
use anyhow::{bail, Result};
use bytes::{Buf, Bytes};
use tracing::*;
use utils::failpoint_support;
use utils::rate_limit::RateLimit;
use crate::context::RequestContext;
use crate::metrics::WAL_INGEST;
@@ -50,11 +42,18 @@ use crate::ZERO_PAGE;
use pageserver_api::key::rel_block_to_key;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::fsm_logical_to_physical;
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::walrecord::*;
use postgres_ffi::TransactionId;
use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
use utils::bin_ser::SerializeError;
use utils::lsn::Lsn;
use utils::rate_limit::RateLimit;
use utils::{critical, failpoint_support};
use wal_decoder::models::*;
enum_pgversion! {CheckPoint, pgv::CheckPoint}
@@ -327,93 +326,75 @@ impl WalIngest {
let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
// Sometimes, Postgres seems to create heap WAL records with the
// ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
// not set. In fact, it's possible that the VM page does not exist at all.
// In that case, we don't want to store a record to clear the VM bit;
// replaying it would fail to find the previous image of the page, because
// it doesn't exist. So check if the VM page(s) exist, and skip the WAL
// record if it doesn't.
//
// TODO: analyze the metrics and tighten this up accordingly. This logic
// implicitly assumes that VM pages see explicit WAL writes before
// implicit ClearVmBits, and will otherwise silently drop updates.
// VM bits can only be cleared on the shard(s) owning the VM relation, and must be within
// its view of the VM relation size. Out of caution, error instead of failing WAL ingestion,
// as there has historically been cases where PostgreSQL has cleared spurious VM pages. See:
// https://github.com/neondatabase/neon/pull/10634.
let Some(vm_size) = get_relsize(modification, vm_rel, ctx).await? else {
WAL_INGEST
.clear_vm_bits_unknown
.with_label_values(&["relation"])
.inc();
critical!("clear_vm_bits for unknown VM relation {vm_rel}");
return Ok(());
};
if let Some(blknum) = new_vm_blk {
if blknum >= vm_size {
WAL_INGEST
.clear_vm_bits_unknown
.with_label_values(&["new_page"])
.inc();
critical!("new_vm_blk {blknum} not in {vm_rel} of size {vm_size}");
new_vm_blk = None;
}
}
if let Some(blknum) = old_vm_blk {
if blknum >= vm_size {
WAL_INGEST
.clear_vm_bits_unknown
.with_label_values(&["old_page"])
.inc();
critical!("old_vm_blk {blknum} not in {vm_rel} of size {vm_size}");
old_vm_blk = None;
}
}
if new_vm_blk.is_some() || old_vm_blk.is_some() {
if new_vm_blk == old_vm_blk {
// An UPDATE record that needs to clear the bits for both old and the
// new page, both of which reside on the same VM page.
if new_vm_blk.is_none() && old_vm_blk.is_none() {
return Ok(());
} else if new_vm_blk == old_vm_blk {
// An UPDATE record that needs to clear the bits for both old and the new page, both of
// which reside on the same VM page.
self.put_rel_wal_record(
modification,
vm_rel,
new_vm_blk.unwrap(),
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
old_heap_blkno,
flags,
},
ctx,
)
.await?;
} else {
// Clear VM bits for one heap page, or for two pages that reside on different VM pages.
if let Some(new_vm_blk) = new_vm_blk {
self.put_rel_wal_record(
modification,
vm_rel,
new_vm_blk.unwrap(),
new_vm_blk,
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
old_heap_blkno: None,
flags,
},
ctx,
)
.await?;
}
if let Some(old_vm_blk) = old_vm_blk {
self.put_rel_wal_record(
modification,
vm_rel,
old_vm_blk,
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno: None,
old_heap_blkno,
flags,
},
ctx,
)
.await?;
} else {
// Clear VM bits for one heap page, or for two pages that reside on
// different VM pages.
if let Some(new_vm_blk) = new_vm_blk {
self.put_rel_wal_record(
modification,
vm_rel,
new_vm_blk,
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
old_heap_blkno: None,
flags,
},
ctx,
)
.await?;
}
if let Some(old_vm_blk) = old_vm_blk {
self.put_rel_wal_record(
modification,
vm_rel,
old_vm_blk,
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno: None,
old_heap_blkno,
flags,
},
ctx,
)
.await?;
}
}
}
Ok(())
}

View File

@@ -19,6 +19,7 @@ aws-config.workspace = true
aws-sdk-iam.workspace = true
aws-sigv4.workspace = true
base64.workspace = true
boxcar = "0.2.8"
bstr.workspace = true
bytes = { workspace = true, features = ["serde"] }
camino.workspace = true
@@ -26,6 +27,7 @@ chrono.workspace = true
clap = { workspace = true, features = ["derive", "env"] }
clashmap.workspace = true
compute_api.workspace = true
console-subscriber.workspace = true
consumption_metrics.workspace = true
env_logger.workspace = true
framed-websockets.workspace = true
@@ -42,6 +44,7 @@ hyper0.workspace = true
hyper = { workspace = true, features = ["server", "http1", "http2"] }
hyper-util = { version = "0.1", features = ["server", "http1", "http2", "tokio"] }
http-body-util = { version = "0.1" }
gettid = "0.1.3"
indexmap = { workspace = true, features = ["serde"] }
ipnet.workspace = true
itertools.workspace = true
@@ -50,6 +53,8 @@ lasso = { workspace = true, features = ["multi-threaded"] }
measured = { workspace = true, features = ["lasso"] }
metrics.workspace = true
once_cell.workspace = true
opentelemetry = { workspace = true, features = ["trace"] }
papaya = "0.1.8"
parking_lot.workspace = true
parquet.workspace = true
parquet_derive.workspace = true
@@ -89,6 +94,9 @@ tokio = { workspace = true, features = ["signal"] }
tracing-subscriber.workspace = true
tracing-utils.workspace = true
tracing.workspace = true
tracing-log.workspace = true
tracing-serde.workspace = true
tracing-opentelemetry.workspace = true
try-lock.workspace = true
typed-json.workspace = true
url.workspace = true
@@ -112,6 +120,7 @@ rsa = "0.9"
workspace_hack.workspace = true
[dev-dependencies]
assert-json-diff.workspace = true
camino-tempfile.workspace = true
fallible-iterator.workspace = true
flate2.workspace = true
@@ -122,3 +131,6 @@ rstest.workspace = true
walkdir.workspace = true
rand_distr = "0.4"
tokio-postgres.workspace = true
[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }

View File

@@ -1,10 +1,23 @@
use tracing::Subscriber;
use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::hash::BuildHasher;
use std::{env, io};
use chrono::{DateTime, Utc};
use opentelemetry::trace::TraceContextExt;
use scopeguard::defer;
use serde::ser::{SerializeMap, Serializer};
use tracing::span;
use tracing::subscriber::Interest;
use tracing::{callsite, Event, Metadata, Span, Subscriber};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_subscriber::filter::{EnvFilter, LevelFilter};
use tracing_subscriber::fmt::format::{Format, Full};
use tracing_subscriber::fmt::time::SystemTime;
use tracing_subscriber::fmt::{FormatEvent, FormatFields};
use tracing_subscriber::layer::{Context, Identity, Layer};
use tracing_subscriber::prelude::*;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::registry::{LookupSpan, SpanRef};
/// Initialize logging and OpenTelemetry tracing and exporter.
///
@@ -15,6 +28,8 @@ use tracing_subscriber::registry::LookupSpan;
/// destination, set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`.
/// See <https://opentelemetry.io/docs/reference/specification/sdk-environment-variables>
pub async fn init() -> anyhow::Result<LoggingGuard> {
let logfmt = LogFormat::from_env()?;
let env_filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy()
@@ -29,17 +44,48 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
.expect("this should be a valid filter directive"),
);
let fmt_layer = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_writer(std::io::stderr)
.with_target(false);
let otlp_layer = tracing_utils::init_tracing("proxy").await;
let json_log_layer = if logfmt == LogFormat::Json {
Some(JsonLoggingLayer {
clock: RealClock,
skipped_field_indices: papaya::HashMap::default(),
writer: StderrWriter {
stderr: std::io::stderr(),
},
})
} else {
None
};
let text_log_layer = if logfmt == LogFormat::Text {
Some(
tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_writer(std::io::stderr)
.with_target(false),
)
} else {
None
};
let export_layer = Identity::new()
.and_then(otlp_layer)
.and_then(json_log_layer)
.and_then(text_log_layer)
.with_filter(env_filter);
#[cfg(not(tokio_unstable))]
let tokio_console_layer = Identity::new();
#[cfg(tokio_unstable)]
let tokio_console_layer = console_subscriber::ConsoleLayer::builder()
.with_default_env()
.enable_grpc_web(true)
.spawn();
tracing_subscriber::registry()
.with(env_filter)
.with(otlp_layer)
.with(fmt_layer)
.with(export_layer)
.with(tokio_console_layer)
.try_init()?;
Ok(LoggingGuard)
@@ -94,3 +140,857 @@ impl Drop for LoggingGuard {
tracing_utils::shutdown_tracing();
}
}
// TODO: make JSON the default
#[derive(Copy, Clone, PartialEq, Eq, Default, Debug)]
enum LogFormat {
#[default]
Text = 1,
Json,
}
impl LogFormat {
fn from_env() -> anyhow::Result<Self> {
let logfmt = env::var("LOGFMT");
Ok(match logfmt.as_deref() {
Err(_) => LogFormat::default(),
Ok("text") => LogFormat::Text,
Ok("json") => LogFormat::Json,
Ok(logfmt) => anyhow::bail!("unknown log format: {logfmt}"),
})
}
}
trait MakeWriter {
fn make_writer(&self) -> impl io::Write;
}
struct StderrWriter {
stderr: io::Stderr,
}
impl MakeWriter for StderrWriter {
#[inline]
fn make_writer(&self) -> impl io::Write {
self.stderr.lock()
}
}
// TODO: move into separate module or even separate crate.
trait Clock {
fn now(&self) -> DateTime<Utc>;
}
struct RealClock;
impl Clock for RealClock {
#[inline]
fn now(&self) -> DateTime<Utc> {
Utc::now()
}
}
/// Name of the field used by tracing crate to store the event message.
const MESSAGE_FIELD: &str = "message";
thread_local! {
/// Protects against deadlocks and double panics during log writing.
/// The current panic handler will use tracing to log panic information.
static REENTRANCY_GUARD: Cell<bool> = const { Cell::new(false) };
/// Thread-local instance with per-thread buffer for log writing.
static EVENT_FORMATTER: RefCell<EventFormatter> = RefCell::new(EventFormatter::new());
/// Cached OS thread ID.
static THREAD_ID: u64 = gettid::gettid();
}
/// Implements tracing layer to handle events specific to logging.
struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
clock: C,
skipped_field_indices: papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
writer: W,
}
impl<S, C: Clock + 'static, W: MakeWriter + 'static> Layer<S> for JsonLoggingLayer<C, W>
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
use std::io::Write;
// TODO: consider special tracing subscriber to grab timestamp very
// early, before OTel machinery, and add as event extension.
let now = self.clock.now();
let res: io::Result<()> = REENTRANCY_GUARD.with(move |entered| {
if entered.get() {
let mut formatter = EventFormatter::new();
formatter.format(now, event, &ctx, &self.skipped_field_indices)?;
self.writer.make_writer().write_all(formatter.buffer())
} else {
entered.set(true);
defer!(entered.set(false););
EVENT_FORMATTER.with_borrow_mut(move |formatter| {
formatter.reset();
formatter.format(now, event, &ctx, &self.skipped_field_indices)?;
self.writer.make_writer().write_all(formatter.buffer())
})
}
});
// In case logging fails we generate a simpler JSON object.
if let Err(err) = res {
if let Ok(mut line) = serde_json::to_vec(&serde_json::json!( {
"timestamp": now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
"level": "ERROR",
"message": format_args!("cannot log event: {err:?}"),
"fields": {
"event": format_args!("{event:?}"),
},
})) {
line.push(b'\n');
self.writer.make_writer().write_all(&line).ok();
}
}
}
/// Registers a SpanFields instance as span extension.
fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
let span = ctx.span(id).expect("span must exist");
let fields = SpanFields::default();
fields.record_fields(attrs);
// This could deadlock when there's a panic somewhere in the tracing
// event handling and a read or write guard is still held. This includes
// the OTel subscriber.
span.extensions_mut().insert(fields);
}
fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
let span = ctx.span(id).expect("span must exist");
let ext = span.extensions();
if let Some(data) = ext.get::<SpanFields>() {
data.record_fields(values);
}
}
/// Called (lazily) whenever a new log call is executed. We quickly check
/// for duplicate field names and record duplicates as skippable. Last one
/// wins.
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
if !metadata.is_event() {
// Must not be never because we wouldn't get trace and span data.
return Interest::always();
}
let mut field_indices = SkippedFieldIndices::default();
let mut seen_fields = HashMap::<&'static str, usize>::new();
for field in metadata.fields() {
use std::collections::hash_map::Entry;
match seen_fields.entry(field.name()) {
Entry::Vacant(entry) => {
// field not seen yet
entry.insert(field.index());
}
Entry::Occupied(mut entry) => {
// replace currently stored index
let old_index = entry.insert(field.index());
// ... and append it to list of skippable indices
field_indices.push(old_index);
}
}
}
if !field_indices.is_empty() {
self.skipped_field_indices
.pin()
.insert(metadata.callsite(), field_indices);
}
Interest::always()
}
}
/// Stores span field values recorded during the spans lifetime.
#[derive(Default)]
struct SpanFields {
// TODO: Switch to custom enum with lasso::Spur for Strings?
fields: papaya::HashMap<&'static str, serde_json::Value>,
}
impl SpanFields {
#[inline]
fn record_fields<R: tracing_subscriber::field::RecordFields>(&self, fields: R) {
fields.record(&mut SpanFieldsRecorder {
fields: self.fields.pin(),
});
}
}
/// Implements a tracing field visitor to convert and store values.
struct SpanFieldsRecorder<'m, S, G> {
fields: papaya::HashMapRef<'m, &'static str, serde_json::Value, S, G>,
}
impl<S: BuildHasher, G: papaya::Guard> tracing::field::Visit for SpanFieldsRecorder<'_, S, G> {
#[inline]
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
}
#[inline]
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
}
#[inline]
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
}
#[inline]
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
if let Ok(value) = i64::try_from(value) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
} else {
self.fields
.insert(field.name(), serde_json::Value::from(format!("{value}")));
}
}
#[inline]
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
if let Ok(value) = u64::try_from(value) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
} else {
self.fields
.insert(field.name(), serde_json::Value::from(format!("{value}")));
}
}
#[inline]
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
}
#[inline]
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
}
#[inline]
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
}
#[inline]
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
self.fields
.insert(field.name(), serde_json::Value::from(format!("{value:?}")));
}
#[inline]
fn record_error(
&mut self,
field: &tracing::field::Field,
value: &(dyn std::error::Error + 'static),
) {
self.fields
.insert(field.name(), serde_json::Value::from(format!("{value}")));
}
}
/// List of field indices skipped during logging. Can list duplicate fields or
/// metafields not meant to be logged.
#[derive(Clone, Default)]
struct SkippedFieldIndices {
bits: u64,
}
impl SkippedFieldIndices {
#[inline]
fn is_empty(&self) -> bool {
self.bits == 0
}
#[inline]
fn push(&mut self, index: usize) {
self.bits |= 1u64
.checked_shl(index as u32)
.expect("field index too large");
}
#[inline]
fn contains(&self, index: usize) -> bool {
self.bits
& 1u64
.checked_shl(index as u32)
.expect("field index too large")
!= 0
}
}
/// Formats a tracing event and writes JSON to its internal buffer including a newline.
// TODO: buffer capacity management, truncate if too large
struct EventFormatter {
logline_buffer: Vec<u8>,
}
impl EventFormatter {
#[inline]
fn new() -> Self {
EventFormatter {
logline_buffer: Vec::new(),
}
}
#[inline]
fn buffer(&self) -> &[u8] {
&self.logline_buffer
}
#[inline]
fn reset(&mut self) {
self.logline_buffer.clear();
}
fn format<S>(
&mut self,
now: DateTime<Utc>,
event: &Event<'_>,
ctx: &Context<'_, S>,
skipped_field_indices: &papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
) -> io::Result<()>
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
use tracing_log::NormalizeEvent;
let normalized_meta = event.normalized_metadata();
let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
let skipped_field_indices = skipped_field_indices.pin();
let skipped_field_indices = skipped_field_indices.get(&meta.callsite());
let mut serialize = || {
let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer);
let mut serializer = serializer.serialize_map(None)?;
// Timestamp comes first, so raw lines can be sorted by timestamp.
serializer.serialize_entry("timestamp", &timestamp)?;
// Level next.
serializer.serialize_entry("level", &meta.level().as_str())?;
// Message next.
serializer.serialize_key("message")?;
let mut message_extractor =
MessageFieldExtractor::new(serializer, skipped_field_indices);
event.record(&mut message_extractor);
let mut serializer = message_extractor.into_serializer()?;
let mut fields_present = FieldsPresent(false, skipped_field_indices);
event.record(&mut fields_present);
if fields_present.0 {
serializer.serialize_entry(
"fields",
&SerializableEventFields(event, skipped_field_indices),
)?;
}
let pid = std::process::id();
if pid != 1 {
serializer.serialize_entry("process_id", &pid)?;
}
THREAD_ID.with(|tid| serializer.serialize_entry("thread_id", tid))?;
// TODO: tls cache? name could change
if let Some(thread_name) = std::thread::current().name() {
if !thread_name.is_empty() && thread_name != "tokio-runtime-worker" {
serializer.serialize_entry("thread_name", thread_name)?;
}
}
if let Some(task_id) = tokio::task::try_id() {
serializer.serialize_entry("task_id", &format_args!("{task_id}"))?;
}
serializer.serialize_entry("target", meta.target())?;
if let Some(module) = meta.module_path() {
if module != meta.target() {
serializer.serialize_entry("module", module)?;
}
}
if let Some(file) = meta.file() {
if let Some(line) = meta.line() {
serializer.serialize_entry("src", &format_args!("{file}:{line}"))?;
} else {
serializer.serialize_entry("src", file)?;
}
}
{
let otel_context = Span::current().context();
let otel_spanref = otel_context.span();
let span_context = otel_spanref.span_context();
if span_context.is_valid() {
serializer.serialize_entry(
"trace_id",
&format_args!("{}", span_context.trace_id()),
)?;
}
}
serializer.serialize_entry("spans", &SerializableSpanStack(ctx))?;
serializer.end()
};
serialize().map_err(io::Error::other)?;
self.logline_buffer.push(b'\n');
Ok(())
}
}
/// Extracts the message field that's mixed will other fields.
struct MessageFieldExtractor<'a, S: serde::ser::SerializeMap> {
serializer: S,
skipped_field_indices: Option<&'a SkippedFieldIndices>,
state: Option<Result<(), S::Error>>,
}
impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> {
#[inline]
fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
Self {
serializer,
skipped_field_indices,
state: None,
}
}
#[inline]
fn into_serializer(mut self) -> Result<S, S::Error> {
match self.state {
Some(Ok(())) => {}
Some(Err(err)) => return Err(err),
None => self.serializer.serialize_value("")?,
}
Ok(self.serializer)
}
#[inline]
fn accept_field(&self, field: &tracing::field::Field) -> bool {
self.state.is_none()
&& field.name() == MESSAGE_FIELD
&& !self
.skipped_field_indices
.is_some_and(|i| i.contains(field.index()))
}
}
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<'_, S> {
#[inline]
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&value));
}
}
#[inline]
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&value));
}
}
#[inline]
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&value));
}
}
#[inline]
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&value));
}
}
#[inline]
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&value));
}
}
#[inline]
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&value));
}
}
#[inline]
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&format_args!("{value:x?}")));
}
}
#[inline]
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&value));
}
}
#[inline]
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&format_args!("{value:?}")));
}
}
#[inline]
fn record_error(
&mut self,
field: &tracing::field::Field,
value: &(dyn std::error::Error + 'static),
) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&format_args!("{value}")));
}
}
}
/// Checks if there's any fields and field values present. If not, the JSON subobject
/// can be skipped.
// This is entirely optional and only cosmetic, though maybe helps a
// bit during log parsing in dashboards when there's no field with empty object.
struct FieldsPresent<'a>(pub bool, Option<&'a SkippedFieldIndices>);
// Even though some methods have an overhead (error, bytes) it is assumed the
// compiler won't include this since we ignore the value entirely.
impl tracing::field::Visit for FieldsPresent<'_> {
#[inline]
fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) {
if !self.1.is_some_and(|i| i.contains(field.index()))
&& field.name() != MESSAGE_FIELD
&& !field.name().starts_with("log.")
{
self.0 |= true;
}
}
}
/// Serializes the fields directly supplied with a log event.
struct SerializableEventFields<'a, 'event>(
&'a tracing::Event<'event>,
Option<&'a SkippedFieldIndices>,
);
impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
use serde::ser::SerializeMap;
let serializer = serializer.serialize_map(None)?;
let mut message_skipper = MessageFieldSkipper::new(serializer, self.1);
self.0.record(&mut message_skipper);
let serializer = message_skipper.into_serializer()?;
serializer.end()
}
}
/// A tracing field visitor that skips the message field.
struct MessageFieldSkipper<'a, S: serde::ser::SerializeMap> {
serializer: S,
skipped_field_indices: Option<&'a SkippedFieldIndices>,
state: Result<(), S::Error>,
}
impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
#[inline]
fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
Self {
serializer,
skipped_field_indices,
state: Ok(()),
}
}
#[inline]
fn accept_field(&self, field: &tracing::field::Field) -> bool {
self.state.is_ok()
&& field.name() != MESSAGE_FIELD
&& !field.name().starts_with("log.")
&& !self
.skipped_field_indices
.is_some_and(|i| i.contains(field.index()))
}
#[inline]
fn into_serializer(self) -> Result<S, S::Error> {
self.state?;
Ok(self.serializer)
}
}
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<'_, S> {
#[inline]
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
if self.accept_field(field) {
self.state = self.serializer.serialize_entry(field.name(), &value);
}
}
#[inline]
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
if self.accept_field(field) {
self.state = self.serializer.serialize_entry(field.name(), &value);
}
}
#[inline]
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
if self.accept_field(field) {
self.state = self.serializer.serialize_entry(field.name(), &value);
}
}
#[inline]
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
if self.accept_field(field) {
self.state = self.serializer.serialize_entry(field.name(), &value);
}
}
#[inline]
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
if self.accept_field(field) {
self.state = self.serializer.serialize_entry(field.name(), &value);
}
}
#[inline]
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
if self.accept_field(field) {
self.state = self.serializer.serialize_entry(field.name(), &value);
}
}
#[inline]
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
if self.accept_field(field) {
self.state = self
.serializer
.serialize_entry(field.name(), &format_args!("{value:x?}"));
}
}
#[inline]
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
if self.accept_field(field) {
self.state = self.serializer.serialize_entry(field.name(), &value);
}
}
#[inline]
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
if self.accept_field(field) {
self.state = self
.serializer
.serialize_entry(field.name(), &format_args!("{value:?}"));
}
}
#[inline]
fn record_error(
&mut self,
field: &tracing::field::Field,
value: &(dyn std::error::Error + 'static),
) {
if self.accept_field(field) {
self.state = self.serializer.serialize_value(&format_args!("{value}"));
}
}
}
/// Serializes the span stack from root to leaf (parent of event) enumerated
/// inside an object where the keys are just the number padded with zeroes
/// to retain sorting order.
// The object is necessary because Loki cannot flatten arrays.
struct SerializableSpanStack<'a, 'b, Span>(&'b Context<'a, Span>)
where
Span: Subscriber + for<'lookup> LookupSpan<'lookup>;
impl<Span> serde::ser::Serialize for SerializableSpanStack<'_, '_, Span>
where
Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
{
fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
where
Ser: serde::ser::Serializer,
{
let mut serializer = serializer.serialize_map(None)?;
if let Some(leaf_span) = self.0.lookup_current() {
for (i, span) in leaf_span.scope().from_root().enumerate() {
serializer.serialize_entry(&format_args!("{i:02}"), &SerializableSpan(&span))?;
}
}
serializer.end()
}
}
/// Serializes a single span. Include the span ID, name and its fields as
/// recorded up to this point.
struct SerializableSpan<'a, 'b, Span>(&'b SpanRef<'a, Span>)
where
Span: for<'lookup> LookupSpan<'lookup>;
impl<Span> serde::ser::Serialize for SerializableSpan<'_, '_, Span>
where
Span: for<'lookup> LookupSpan<'lookup>,
{
fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
where
Ser: serde::ser::Serializer,
{
let mut serializer = serializer.serialize_map(None)?;
// TODO: the span ID is probably only useful for debugging tracing.
serializer.serialize_entry("span_id", &format_args!("{:016x}", self.0.id().into_u64()))?;
serializer.serialize_entry("span_name", self.0.metadata().name())?;
let ext = self.0.extensions();
if let Some(data) = ext.get::<SpanFields>() {
for (key, value) in &data.fields.pin() {
serializer.serialize_entry(key, value)?;
}
}
serializer.end()
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use std::sync::{Arc, Mutex, MutexGuard};
use assert_json_diff::assert_json_eq;
use tracing::info_span;
use super::*;
struct TestClock {
current_time: Mutex<DateTime<Utc>>,
}
impl Clock for Arc<TestClock> {
fn now(&self) -> DateTime<Utc> {
*self.current_time.lock().expect("poisoned")
}
}
struct VecWriter<'a> {
buffer: MutexGuard<'a, Vec<u8>>,
}
impl MakeWriter for Arc<Mutex<Vec<u8>>> {
fn make_writer(&self) -> impl io::Write {
VecWriter {
buffer: self.lock().expect("poisoned"),
}
}
}
impl io::Write for VecWriter<'_> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.buffer.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
#[test]
fn test_field_collection() {
let clock = Arc::new(TestClock {
current_time: Mutex::new(Utc::now()),
});
let buffer = Arc::new(Mutex::new(Vec::new()));
let log_layer = JsonLoggingLayer {
clock: clock.clone(),
skipped_field_indices: papaya::HashMap::default(),
writer: buffer.clone(),
};
let registry = tracing_subscriber::Registry::default().with(log_layer);
tracing::subscriber::with_default(registry, || {
info_span!("span1", x = 40, x = 41, x = 42).in_scope(|| {
info_span!("span2").in_scope(|| {
tracing::error!(
a = 1,
a = 2,
a = 3,
message = "explicit message field",
"implicit message field"
);
});
});
});
let buffer = Arc::try_unwrap(buffer)
.expect("no other reference")
.into_inner()
.expect("poisoned");
let actual: serde_json::Value = serde_json::from_slice(&buffer).expect("valid JSON");
let expected: serde_json::Value = serde_json::json!(
{
"timestamp": clock.now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
"level": "ERROR",
"message": "explicit message field",
"fields": {
"a": 3,
},
"spans": {
"00":{
"span_id": "0000000000000001",
"span_name": "span1",
"x": 42,
},
"01": {
"span_id": "0000000000000002",
"span_name": "span2",
}
},
"src": actual.as_object().unwrap().get("src").unwrap().as_str().unwrap(),
"target": "proxy::logging::tests",
"process_id": actual.as_object().unwrap().get("process_id").unwrap().as_number().unwrap(),
"thread_id": actual.as_object().unwrap().get("thread_id").unwrap().as_number().unwrap(),
"thread_name": "logging::tests::test_field_collection",
}
);
assert_json_eq!(actual, expected);
}
}

View File

@@ -27,7 +27,7 @@ use pageserver_api::shard::ShardConfigError;
use pageserver_api::shard::ShardIdentity;
use pageserver_api::shard::ShardStripeSize;
use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
use rustls::client::danger::ServerCertVerifier;
use rustls::client::danger::{ServerCertVerified, ServerCertVerifier};
use rustls::client::WebPkiServerVerifier;
use rustls::crypto::ring;
use scoped_futures::ScopedBoxFuture;
@@ -194,6 +194,8 @@ impl Persistence {
timeout: Duration,
) -> Result<(), diesel::ConnectionError> {
let started_at = Instant::now();
log_postgres_connstr_info(database_url)
.map_err(|e| diesel::ConnectionError::InvalidConnectionUrl(e.to_string()))?;
loop {
match establish_connection_rustls(database_url).await {
Ok(_) => {
@@ -1281,6 +1283,51 @@ pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
Ok(Arc::new(store))
}
#[derive(Debug)]
/// A verifier that accepts all certificates (but logs an error still)
struct AcceptAll(Arc<WebPkiServerVerifier>);
impl ServerCertVerifier for AcceptAll {
fn verify_server_cert(
&self,
end_entity: &rustls::pki_types::CertificateDer<'_>,
intermediates: &[rustls::pki_types::CertificateDer<'_>],
server_name: &rustls::pki_types::ServerName<'_>,
ocsp_response: &[u8],
now: rustls::pki_types::UnixTime,
) -> Result<ServerCertVerified, rustls::Error> {
let r =
self.0
.verify_server_cert(end_entity, intermediates, server_name, ocsp_response, now);
if let Err(err) = r {
tracing::info!(
?server_name,
"ignoring db connection TLS validation error: {err:?}"
);
return Ok(ServerCertVerified::assertion());
}
r
}
fn verify_tls12_signature(
&self,
message: &[u8],
cert: &rustls::pki_types::CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
self.0.verify_tls12_signature(message, cert, dss)
}
fn verify_tls13_signature(
&self,
message: &[u8],
cert: &rustls::pki_types::CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
self.0.verify_tls13_signature(message, cert, dss)
}
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
self.0.supported_verify_schemes()
}
}
/// Loads the root certificates and constructs a client config suitable for connecting.
/// This function is blocking.
fn client_config_with_root_certs() -> anyhow::Result<rustls::ClientConfig> {
@@ -1290,76 +1337,12 @@ fn client_config_with_root_certs() -> anyhow::Result<rustls::ClientConfig> {
.expect("ring should support the default protocol versions");
static DO_CERT_CHECKS: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
let do_cert_checks =
DO_CERT_CHECKS.get_or_init(|| std::env::var("STORCON_CERT_CHECKS").is_ok());
DO_CERT_CHECKS.get_or_init(|| std::env::var("STORCON_DB_CERT_CHECKS").is_ok());
Ok(if *do_cert_checks {
client_config
.with_root_certificates(load_certs()?)
.with_no_client_auth()
} else {
use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified};
#[derive(Debug)]
struct AcceptAll(Arc<WebPkiServerVerifier>);
impl ServerCertVerifier for AcceptAll {
fn verify_server_cert(
&self,
end_entity: &rustls::pki_types::CertificateDer<'_>,
intermediates: &[rustls::pki_types::CertificateDer<'_>],
server_name: &rustls::pki_types::ServerName<'_>,
ocsp_response: &[u8],
now: rustls::pki_types::UnixTime,
) -> Result<ServerCertVerified, rustls::Error> {
let r = self.0.verify_server_cert(
end_entity,
intermediates,
server_name,
ocsp_response,
now,
);
if let Err(err) = r {
tracing::info!(
?server_name,
"ignoring db connection TLS validation error: {err:?}"
);
return Ok(ServerCertVerified::assertion());
}
r
}
fn verify_tls12_signature(
&self,
message: &[u8],
cert: &rustls::pki_types::CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error>
{
let r = self.0.verify_tls12_signature(message, cert, dss);
if let Err(err) = r {
tracing::info!(
"ignoring db connection 1.2 signature TLS validation error: {err:?}"
);
return Ok(HandshakeSignatureValid::assertion());
}
r
}
fn verify_tls13_signature(
&self,
message: &[u8],
cert: &rustls::pki_types::CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error>
{
let r = self.0.verify_tls13_signature(message, cert, dss);
if let Err(err) = r {
tracing::info!(
"ignoring db connection 1.3 signature TLS validation error: {err:?}"
);
return Ok(HandshakeSignatureValid::assertion());
}
r
}
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
self.0.supported_verify_schemes()
}
}
let verifier = AcceptAll(
WebPkiServerVerifier::builder_with_provider(
load_certs()?,
@@ -1389,6 +1372,29 @@ fn establish_connection_rustls(config: &str) -> BoxFuture<ConnectionResult<Async
fut.boxed()
}
#[cfg_attr(test, test)]
fn test_config_debug_censors_password() {
let has_pw =
"host=/var/lib/postgresql,localhost port=1234 user=specialuser password='NOT ALLOWED TAG'";
let has_pw_cfg = has_pw.parse::<tokio_postgres::Config>().unwrap();
assert!(format!("{has_pw_cfg:?}").contains("specialuser"));
// Ensure that the password is not leaked by the debug impl
assert!(!format!("{has_pw_cfg:?}").contains("NOT ALLOWED TAG"));
}
fn log_postgres_connstr_info(config_str: &str) -> anyhow::Result<()> {
let config = config_str
.parse::<tokio_postgres::Config>()
.map_err(|_e| anyhow::anyhow!("Couldn't parse config str"))?;
// We use debug formatting here, and use a unit test to ensure that we don't leak the password.
// To make extra sure the test gets ran, run it every time the function is called
// (this is rather cold code, we can afford it).
#[cfg(not(test))]
test_config_debug_censors_password();
tracing::info!("database connection config: {config:?}");
Ok(())
}
/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
#[derive(
QueryableByName, Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq,

View File

@@ -2766,6 +2766,11 @@ class NeonPageserver(PgProtocol, LogUtils):
log.error(f"Failed to decode LocationConf, raw content ({len(bytes)} bytes): {bytes}")
raise
def heatmap_content(self, tenant_shard_id: TenantId | TenantShardId) -> Any:
path = self.tenant_dir(tenant_shard_id) / "heatmap-v1.json"
with open(path) as f:
return json.load(f)
def tenant_create(
self,
tenant_id: TenantId,

View File

@@ -250,6 +250,9 @@ def test_pageserver_gc_compaction_idempotent(
workload.churn_rows(row_count, env.pageserver.id)
# compact 3 times if mode is before_restart
n_compactions = 3 if compaction_mode == "before_restart" else 1
ps_http.timeline_compact(
tenant_id, timeline_id, force_l0_compaction=True, wait_until_uploaded=True
)
for _ in range(n_compactions):
# Force refresh gc info to have gc_cutoff generated
ps_http.timeline_gc(tenant_id, timeline_id, None)

View File

@@ -443,7 +443,7 @@ def test_heatmap_uploads(neon_env_builder: NeonEnvBuilder):
workload.write_rows(256, env.pageservers[0].id)
env.pageserver.http_client().tenant_heatmap_upload(tenant_id)
def validate_heatmap(heatmap):
def validate_heatmap(heatmap, on_disk_heatmap):
assert len(heatmap["timelines"]) == 1
assert heatmap["timelines"][0]["timeline_id"] == str(timeline_id)
assert len(heatmap["timelines"][0]["layers"]) > 0
@@ -452,10 +452,13 @@ def test_heatmap_uploads(neon_env_builder: NeonEnvBuilder):
# Each layer appears at most once
assert len(set(layer["name"] for layer in layers)) == len(layers)
assert heatmap == on_disk_heatmap
# Download and inspect the heatmap that the pageserver uploaded
heatmap_first = env.pageserver_remote_storage.heatmap_content(tenant_id)
heatmap_first_on_disk = env.pageserver.heatmap_content(tenant_id)
log.info(f"Read back heatmap: {heatmap_first}")
validate_heatmap(heatmap_first)
validate_heatmap(heatmap_first, heatmap_first_on_disk)
# Do some more I/O to generate more layers
workload.churn_rows(64, env.pageservers[0].id)
@@ -463,9 +466,10 @@ def test_heatmap_uploads(neon_env_builder: NeonEnvBuilder):
# Ensure that another heatmap upload includes the new layers
heatmap_second = env.pageserver_remote_storage.heatmap_content(tenant_id)
heatmap_second_on_disk = env.pageserver.heatmap_content(tenant_id)
log.info(f"Read back heatmap: {heatmap_second}")
assert heatmap_second != heatmap_first
validate_heatmap(heatmap_second)
validate_heatmap(heatmap_second, heatmap_second_on_disk)
def list_elegible_layers(

View File

@@ -25,6 +25,7 @@ camino = { version = "1", default-features = false, features = ["serde1"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
clap = { version = "4", features = ["derive", "env", "string"] }
clap_builder = { version = "4", default-features = false, features = ["color", "env", "help", "std", "string", "suggestions", "usage"] }
crossbeam-utils = { version = "0.8" }
crypto-bigint = { version = "0.5", features = ["generic-array", "zeroize"] }
der = { version = "0.7", default-features = false, features = ["oid", "pem", "std"] }
deranged = { version = "0.3", default-features = false, features = ["powerfmt", "serde", "std"] }
@@ -40,11 +41,12 @@ generic-array = { version = "0.14", default-features = false, features = ["more_
getrandom = { version = "0.2", default-features = false, features = ["std"] }
half = { version = "2", default-features = false, features = ["num-traits"] }
hashbrown = { version = "0.14", features = ["raw"] }
hdrhistogram = { version = "7" }
hex = { version = "0.4", features = ["serde"] }
hmac = { version = "0.12", default-features = false, features = ["reset"] }
hyper-582f2526e08bb6a0 = { package = "hyper", version = "0.14", features = ["full"] }
hyper-dff4ba8e3ae991db = { package = "hyper", version = "1", features = ["full"] }
hyper-util = { version = "0.1", features = ["client-legacy", "http1", "http2", "server", "service"] }
hyper-util = { version = "0.1", features = ["client-legacy", "server-auto", "service"] }
indexmap-dff4ba8e3ae991db = { package = "indexmap", version = "1", default-features = false, features = ["std"] }
indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] }
itertools = { version = "0.12" }
@@ -83,15 +85,18 @@ sync_wrapper = { version = "0.1", default-features = false, features = ["futures
tikv-jemalloc-ctl = { version = "0.6", features = ["stats", "use_std"] }
tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] }
time = { version = "0.3", features = ["macros", "serde-well-known"] }
tokio = { version = "1", features = ["full", "test-util"] }
tokio = { version = "1", features = ["full", "test-util", "tracing"] }
tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] }
tokio-stream = { version = "0.1" }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
toml_edit = { version = "0.22", features = ["serde"] }
tonic = { version = "0.12", default-features = false, features = ["codegen", "prost", "tls-roots"] }
tower = { version = "0.4", default-features = false, features = ["balance", "buffer", "limit", "util"] }
tonic = { version = "0.12", features = ["tls-roots"] }
tower-9fbad63c4bcf4a8f = { package = "tower", version = "0.4", default-features = false, features = ["balance", "buffer", "limit", "util"] }
tower-d8f496e17d97b5cb = { package = "tower", version = "0.5", default-features = false, features = ["log", "make", "util"] }
tracing = { version = "0.1", features = ["log"] }
tracing-core = { version = "0.1" }
tracing-log = { version = "0.2" }
tracing-subscriber = { version = "0.3", default-features = false, features = ["env-filter", "fmt", "json", "parking_lot", "smallvec", "tracing-log"] }
url = { version = "2", features = ["serde"] }
zerocopy = { version = "0.7", features = ["derive", "simd"] }
zeroize = { version = "1", features = ["derive", "serde"] }