mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-06 12:10:37 +00:00
Compare commits
29 Commits
sk-proto-v
...
bodobolero
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d0d29468d8 | ||
|
|
567a665dc4 | ||
|
|
b0007302d0 | ||
|
|
b35dd198c3 | ||
|
|
b66fbd6176 | ||
|
|
95588dab98 | ||
|
|
1686d9e733 | ||
|
|
abcd00181c | ||
|
|
01f0be03b5 | ||
|
|
81cd30e4d6 | ||
|
|
7fc6953da4 | ||
|
|
77f9e74d86 | ||
|
|
0ceeec9be3 | ||
|
|
733a57247b | ||
|
|
6699a30a49 | ||
|
|
133b89a83d | ||
|
|
fba22a7123 | ||
|
|
14e05276a3 | ||
|
|
ebc55e6ae8 | ||
|
|
f07119cca7 | ||
|
|
47975d06d9 | ||
|
|
472007dd7c | ||
|
|
f9009d6b80 | ||
|
|
cab60b6d9f | ||
|
|
06090bbccd | ||
|
|
dcf335a251 | ||
|
|
b6e9daea9a | ||
|
|
d5c3a4e2b9 | ||
|
|
8107140f7f |
@@ -24,3 +24,4 @@
|
||||
!storage_controller/
|
||||
!vendor/postgres-*/
|
||||
!workspace_hack/
|
||||
!build_tools/patches
|
||||
|
||||
2
.github/workflows/build_and_test.yml
vendored
2
.github/workflows/build_and_test.yml
vendored
@@ -682,7 +682,7 @@ jobs:
|
||||
push: true
|
||||
pull: true
|
||||
file: compute/compute-node.Dockerfile
|
||||
target: neon-pg-ext-test
|
||||
target: extension-tests
|
||||
cache-from: type=registry,ref=cache.neon.build/compute-node-${{ matrix.version.pg }}:cache-${{ matrix.version.debian }}-${{ matrix.arch }}
|
||||
tags: |
|
||||
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{needs.tag.outputs.build-tag}}-${{ matrix.version.debian }}-${{ matrix.arch }}
|
||||
|
||||
55
Cargo.lock
generated
55
Cargo.lock
generated
@@ -206,6 +206,16 @@ dependencies = [
|
||||
"syn 2.0.90",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "assert-json-diff"
|
||||
version = "2.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12"
|
||||
dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-channel"
|
||||
version = "1.9.0"
|
||||
@@ -1010,6 +1020,12 @@ dependencies = [
|
||||
"generic-array",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "boxcar"
|
||||
version = "0.2.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2721c3c5a6f0e7f7e607125d963fedeb765f545f67adc9d71ed934693881eb42"
|
||||
|
||||
[[package]]
|
||||
name = "bstr"
|
||||
version = "1.5.0"
|
||||
@@ -2433,6 +2449,16 @@ dependencies = [
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "gettid"
|
||||
version = "0.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "397256552fed4a9e577850498071831ec8f18ea83368aecc114cab469dcb43e5"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "gimli"
|
||||
version = "0.31.1"
|
||||
@@ -4212,6 +4238,16 @@ dependencies = [
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "papaya"
|
||||
version = "0.1.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dc7c76487f7eaa00a0fc1d7f88dc6b295aec478d11b0fc79f857b62c2874124c"
|
||||
dependencies = [
|
||||
"equivalent",
|
||||
"seize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parking"
|
||||
version = "2.1.1"
|
||||
@@ -4839,6 +4875,7 @@ dependencies = [
|
||||
"ahash",
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
"assert-json-diff",
|
||||
"async-compression",
|
||||
"async-trait",
|
||||
"atomic-take",
|
||||
@@ -4846,6 +4883,7 @@ dependencies = [
|
||||
"aws-sdk-iam",
|
||||
"aws-sigv4",
|
||||
"base64 0.13.1",
|
||||
"boxcar",
|
||||
"bstr",
|
||||
"bytes",
|
||||
"camino",
|
||||
@@ -4862,6 +4900,7 @@ dependencies = [
|
||||
"flate2",
|
||||
"framed-websockets",
|
||||
"futures",
|
||||
"gettid",
|
||||
"hashbrown 0.14.5",
|
||||
"hashlink",
|
||||
"hex",
|
||||
@@ -4884,7 +4923,9 @@ dependencies = [
|
||||
"measured",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"opentelemetry",
|
||||
"p256 0.13.2",
|
||||
"papaya",
|
||||
"parking_lot 0.12.1",
|
||||
"parquet",
|
||||
"parquet_derive",
|
||||
@@ -4931,6 +4972,9 @@ dependencies = [
|
||||
"tokio-tungstenite 0.21.0",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-log",
|
||||
"tracing-opentelemetry",
|
||||
"tracing-serde",
|
||||
"tracing-subscriber",
|
||||
"tracing-utils",
|
||||
"try-lock",
|
||||
@@ -5884,6 +5928,16 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "seize"
|
||||
version = "0.4.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d84b0c858bdd30cb56f5597f8b3bf702ec23829e652cc636a1e5a7b9de46ae93"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "semver"
|
||||
version = "1.0.17"
|
||||
@@ -8145,6 +8199,7 @@ dependencies = [
|
||||
"tower 0.4.13",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-log",
|
||||
"url",
|
||||
"zerocopy",
|
||||
"zeroize",
|
||||
|
||||
@@ -54,6 +54,7 @@ async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
|
||||
atomic-take = "1.1.0"
|
||||
backtrace = "0.3.74"
|
||||
flate2 = "1.0.26"
|
||||
assert-json-diff = "2"
|
||||
async-stream = "0.3"
|
||||
async-trait = "0.1"
|
||||
aws-config = { version = "1.5", default-features = false, features=["rustls", "sso"] }
|
||||
@@ -193,7 +194,9 @@ tower-http = { version = "0.6.2", features = ["request-id", "trace"] }
|
||||
tower-service = "0.3.3"
|
||||
tracing = "0.1"
|
||||
tracing-error = "0.2"
|
||||
tracing-log = "0.2"
|
||||
tracing-opentelemetry = "0.28"
|
||||
tracing-serde = "0.2.0"
|
||||
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
|
||||
try-lock = "0.2.5"
|
||||
twox-hash = { version = "1.6.3", default-features = false }
|
||||
|
||||
@@ -3,10 +3,17 @@ ARG DEBIAN_VERSION=bookworm
|
||||
FROM debian:bookworm-slim AS pgcopydb_builder
|
||||
ARG DEBIAN_VERSION
|
||||
|
||||
# Use strict mode for bash to catch errors early
|
||||
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
|
||||
|
||||
# By default, /bin/sh used in debian images will treat '\n' as eol,
|
||||
# but as we use bash as SHELL, and built-in echo in bash requires '-e' flag for that.
|
||||
RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
|
||||
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc \
|
||||
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc && \
|
||||
echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc
|
||||
|
||||
COPY build_tools/patches/pgcopydbv017.patch /pgcopydbv017.patch
|
||||
|
||||
RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
|
||||
set -e && \
|
||||
apt update && \
|
||||
@@ -39,6 +46,7 @@ RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
|
||||
mkdir /tmp/pgcopydb && \
|
||||
tar -xzf /tmp/pgcopydb.tar.gz -C /tmp/pgcopydb --strip-components=1 && \
|
||||
cd /tmp/pgcopydb && \
|
||||
patch -p1 < /pgcopydbv017.patch && \
|
||||
make -s clean && \
|
||||
make -s -j12 install && \
|
||||
libpq_path=$(find /lib /usr/lib -name "libpq.so.5" | head -n 1) && \
|
||||
@@ -55,7 +63,8 @@ ARG DEBIAN_VERSION
|
||||
|
||||
# Add nonroot user
|
||||
RUN useradd -ms /bin/bash nonroot -b /home
|
||||
SHELL ["/bin/bash", "-c"]
|
||||
# Use strict mode for bash to catch errors early
|
||||
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
|
||||
|
||||
RUN mkdir -p /pgcopydb/bin && \
|
||||
mkdir -p /pgcopydb/lib && \
|
||||
@@ -66,7 +75,7 @@ COPY --from=pgcopydb_builder /usr/lib/postgresql/16/bin/pgcopydb /pgcopydb/bin/p
|
||||
COPY --from=pgcopydb_builder /pgcopydb/lib/libpq.so.5 /pgcopydb/lib/libpq.so.5
|
||||
|
||||
RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
|
||||
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc \
|
||||
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc && \
|
||||
echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc
|
||||
|
||||
# System deps
|
||||
@@ -190,8 +199,14 @@ RUN set -e \
|
||||
# It includes several bug fixes on top on v2.0 release (https://github.com/linux-test-project/lcov/compare/v2.0...master)
|
||||
# And patches from us:
|
||||
# - Generates json file with code coverage summary (https://github.com/neondatabase/lcov/commit/426e7e7a22f669da54278e9b55e6d8caabd00af0.tar.gz)
|
||||
RUN for package in Capture::Tiny DateTime Devel::Cover Digest::MD5 File::Spec JSON::XS Memory::Process Time::HiRes JSON; do yes | perl -MCPAN -e "CPAN::Shell->notest('install', '$package')"; done \
|
||||
&& wget https://github.com/neondatabase/lcov/archive/426e7e7a22f669da54278e9b55e6d8caabd00af0.tar.gz -O lcov.tar.gz \
|
||||
RUN set +o pipefail && \
|
||||
for package in Capture::Tiny DateTime Devel::Cover Digest::MD5 File::Spec JSON::XS Memory::Process Time::HiRes JSON; do \
|
||||
yes | perl -MCPAN -e "CPAN::Shell->notest('install', '$package')";\
|
||||
done && \
|
||||
set -o pipefail
|
||||
# Split into separate step to debug flaky failures here
|
||||
RUN wget https://github.com/neondatabase/lcov/archive/426e7e7a22f669da54278e9b55e6d8caabd00af0.tar.gz -O lcov.tar.gz \
|
||||
&& ls -laht lcov.tar.gz && sha256sum lcov.tar.gz \
|
||||
&& echo "61a22a62e20908b8b9e27d890bd0ea31f567a7b9668065589266371dcbca0992 lcov.tar.gz" | sha256sum --check \
|
||||
&& mkdir -p lcov && tar -xzf lcov.tar.gz -C lcov --strip-components=1 \
|
||||
&& cd lcov \
|
||||
|
||||
57
build_tools/patches/pgcopydbv017.patch
Normal file
57
build_tools/patches/pgcopydbv017.patch
Normal file
@@ -0,0 +1,57 @@
|
||||
diff --git a/src/bin/pgcopydb/copydb.c b/src/bin/pgcopydb/copydb.c
|
||||
index d730b03..69a9be9 100644
|
||||
--- a/src/bin/pgcopydb/copydb.c
|
||||
+++ b/src/bin/pgcopydb/copydb.c
|
||||
@@ -44,6 +44,7 @@ GUC dstSettings[] = {
|
||||
{ "synchronous_commit", "'off'" },
|
||||
{ "statement_timeout", "0" },
|
||||
{ "lock_timeout", "0" },
|
||||
+ { "idle_in_transaction_session_timeout", "0" },
|
||||
{ NULL, NULL },
|
||||
};
|
||||
|
||||
diff --git a/src/bin/pgcopydb/pgsql.c b/src/bin/pgcopydb/pgsql.c
|
||||
index 94f2f46..e051ba8 100644
|
||||
--- a/src/bin/pgcopydb/pgsql.c
|
||||
+++ b/src/bin/pgcopydb/pgsql.c
|
||||
@@ -2319,6 +2319,11 @@ pgsql_execute_log_error(PGSQL *pgsql,
|
||||
|
||||
LinesBuffer lbuf = { 0 };
|
||||
|
||||
+ if (message != NULL){
|
||||
+ // make sure message is writable by splitLines
|
||||
+ message = strdup(message);
|
||||
+ }
|
||||
+
|
||||
if (!splitLines(&lbuf, message))
|
||||
{
|
||||
/* errors have already been logged */
|
||||
@@ -2332,6 +2337,7 @@ pgsql_execute_log_error(PGSQL *pgsql,
|
||||
PQbackendPID(pgsql->connection),
|
||||
lbuf.lines[lineNumber]);
|
||||
}
|
||||
+ free(message); // free copy of message we created above
|
||||
|
||||
if (pgsql->logSQL)
|
||||
{
|
||||
@@ -3174,11 +3180,18 @@ pgcopy_log_error(PGSQL *pgsql, PGresult *res, const char *context)
|
||||
/* errors have already been logged */
|
||||
return;
|
||||
}
|
||||
-
|
||||
if (res != NULL)
|
||||
{
|
||||
char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
|
||||
- strlcpy(pgsql->sqlstate, sqlstate, sizeof(pgsql->sqlstate));
|
||||
+ if (sqlstate == NULL)
|
||||
+ {
|
||||
+ // PQresultErrorField returned NULL!
|
||||
+ pgsql->sqlstate[0] = '\0'; // Set to an empty string to avoid segfault
|
||||
+ }
|
||||
+ else
|
||||
+ {
|
||||
+ strlcpy(pgsql->sqlstate, sqlstate, sizeof(pgsql->sqlstate));
|
||||
+ }
|
||||
}
|
||||
|
||||
char *endpoint =
|
||||
File diff suppressed because it is too large
Load Diff
@@ -32,6 +32,7 @@ reason = "the marvin attack only affects private key decryption, not public key
|
||||
# https://embarkstudios.github.io/cargo-deny/checks/licenses/cfg.html
|
||||
[licenses]
|
||||
allow = [
|
||||
"0BSD",
|
||||
"Apache-2.0",
|
||||
"BSD-2-Clause",
|
||||
"BSD-3-Clause",
|
||||
|
||||
@@ -52,6 +52,7 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
|
||||
|
||||
if [ $pg_version -ge 16 ]; then
|
||||
docker cp ext-src $TEST_CONTAINER_NAME:/
|
||||
docker exec $TEST_CONTAINER_NAME bash -c "apt update && apt install -y libtap-parser-sourcehandler-pgtap-perl"
|
||||
# This is required for the pg_hint_plan test, to prevent flaky log message causing the test to fail
|
||||
# It cannot be moved to Dockerfile now because the database directory is created after the start of the container
|
||||
echo Adding dummy config
|
||||
|
||||
4
docker-compose/ext-src/pgjwt-src/neon-test.sh
Executable file
4
docker-compose/ext-src/pgjwt-src/neon-test.sh
Executable file
@@ -0,0 +1,4 @@
|
||||
#!/bin/bash
|
||||
set -ex
|
||||
cd "$(dirname "${0}")"
|
||||
pg_prove test.sql
|
||||
15
docker-compose/ext-src/pgjwt-src/test-upgrade.patch
Normal file
15
docker-compose/ext-src/pgjwt-src/test-upgrade.patch
Normal file
@@ -0,0 +1,15 @@
|
||||
diff --git a/test.sql b/test.sql
|
||||
index d7a0ca8..f15bc76 100644
|
||||
--- a/test.sql
|
||||
+++ b/test.sql
|
||||
@@ -9,9 +9,7 @@
|
||||
\set ON_ERROR_STOP true
|
||||
\set QUIET 1
|
||||
|
||||
-CREATE EXTENSION pgcrypto;
|
||||
-CREATE EXTENSION pgtap;
|
||||
-CREATE EXTENSION pgjwt;
|
||||
+CREATE EXTENSION IF NOT EXISTS pgtap;
|
||||
|
||||
BEGIN;
|
||||
SELECT plan(23);
|
||||
5
docker-compose/ext-src/pgjwt-src/test-upgrade.sh
Executable file
5
docker-compose/ext-src/pgjwt-src/test-upgrade.sh
Executable file
@@ -0,0 +1,5 @@
|
||||
#!/bin/sh
|
||||
set -ex
|
||||
cd "$(dirname ${0})"
|
||||
patch -p1 <test-upgrade.patch
|
||||
pg_prove test.sql
|
||||
@@ -24,7 +24,7 @@ function wait_for_ready {
|
||||
}
|
||||
function create_extensions() {
|
||||
for ext in ${1}; do
|
||||
docker compose exec neon-test-extensions psql -X -v ON_ERROR_STOP=1 -d contrib_regression -c "CREATE EXTENSION IF NOT EXISTS ${ext}"
|
||||
docker compose exec neon-test-extensions psql -X -v ON_ERROR_STOP=1 -d contrib_regression -c "CREATE EXTENSION IF NOT EXISTS ${ext} CASCADE"
|
||||
done
|
||||
}
|
||||
EXTENSIONS='[
|
||||
@@ -40,7 +40,8 @@ EXTENSIONS='[
|
||||
{"extname": "pg_uuidv7", "extdir": "pg_uuidv7-src"},
|
||||
{"extname": "roaringbitmap", "extdir": "pg_roaringbitmap-src"},
|
||||
{"extname": "semver", "extdir": "pg_semver-src"},
|
||||
{"extname": "pg_ivm", "extdir": "pg_ivm-src"}
|
||||
{"extname": "pg_ivm", "extdir": "pg_ivm-src"},
|
||||
{"extname": "pgjwt", "extdir": "pgjwt-src"}
|
||||
]'
|
||||
EXTNAMES=$(echo ${EXTENSIONS} | jq -r '.[].extname' | paste -sd ' ' -)
|
||||
TAG=${NEWTAG} docker compose --profile test-extensions up --quiet-pull --build -d
|
||||
|
||||
@@ -204,14 +204,16 @@ impl RemoteExtSpec {
|
||||
|
||||
// Check if extension is present in public or custom.
|
||||
// If not, then it is not allowed to be used by this compute.
|
||||
if let Some(public_extensions) = &self.public_extensions {
|
||||
if !public_extensions.contains(&real_ext_name.to_string()) {
|
||||
if let Some(custom_extensions) = &self.custom_extensions {
|
||||
if !custom_extensions.contains(&real_ext_name.to_string()) {
|
||||
return Err(anyhow::anyhow!("extension {} is not found", real_ext_name));
|
||||
}
|
||||
}
|
||||
}
|
||||
if !self
|
||||
.public_extensions
|
||||
.as_ref()
|
||||
.is_some_and(|exts| exts.iter().any(|e| e == ext_name))
|
||||
&& !self
|
||||
.custom_extensions
|
||||
.as_ref()
|
||||
.is_some_and(|exts| exts.iter().any(|e| e == ext_name))
|
||||
{
|
||||
return Err(anyhow::anyhow!("extension {} is not found", real_ext_name));
|
||||
}
|
||||
|
||||
match self.extension_data.get(real_ext_name) {
|
||||
@@ -340,6 +342,96 @@ mod tests {
|
||||
use super::*;
|
||||
use std::fs::File;
|
||||
|
||||
#[test]
|
||||
fn allow_installing_remote_extensions() {
|
||||
let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
|
||||
"public_extensions": null,
|
||||
"custom_extensions": null,
|
||||
"library_index": {},
|
||||
"extension_data": {},
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
rspec
|
||||
.get_ext("ext", false, "latest", "v17")
|
||||
.expect_err("Extension should not be found");
|
||||
|
||||
let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
|
||||
"public_extensions": [],
|
||||
"custom_extensions": null,
|
||||
"library_index": {},
|
||||
"extension_data": {},
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
rspec
|
||||
.get_ext("ext", false, "latest", "v17")
|
||||
.expect_err("Extension should not be found");
|
||||
|
||||
let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
|
||||
"public_extensions": [],
|
||||
"custom_extensions": [],
|
||||
"library_index": {
|
||||
"ext": "ext"
|
||||
},
|
||||
"extension_data": {
|
||||
"ext": {
|
||||
"control_data": {
|
||||
"ext.control": ""
|
||||
},
|
||||
"archive_path": ""
|
||||
}
|
||||
},
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
rspec
|
||||
.get_ext("ext", false, "latest", "v17")
|
||||
.expect_err("Extension should not be found");
|
||||
|
||||
let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
|
||||
"public_extensions": [],
|
||||
"custom_extensions": ["ext"],
|
||||
"library_index": {
|
||||
"ext": "ext"
|
||||
},
|
||||
"extension_data": {
|
||||
"ext": {
|
||||
"control_data": {
|
||||
"ext.control": ""
|
||||
},
|
||||
"archive_path": ""
|
||||
}
|
||||
},
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
rspec
|
||||
.get_ext("ext", false, "latest", "v17")
|
||||
.expect("Extension should be found");
|
||||
|
||||
let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
|
||||
"public_extensions": ["ext"],
|
||||
"custom_extensions": [],
|
||||
"library_index": {
|
||||
"ext": "ext"
|
||||
},
|
||||
"extension_data": {
|
||||
"ext": {
|
||||
"control_data": {
|
||||
"ext.control": ""
|
||||
},
|
||||
"archive_path": ""
|
||||
}
|
||||
},
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
rspec
|
||||
.get_ext("ext", false, "latest", "v17")
|
||||
.expect("Extension should be found");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_spec_file() {
|
||||
let file = File::open("tests/cluster_spec.json").unwrap();
|
||||
|
||||
@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{
|
||||
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT,
|
||||
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
|
||||
DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
|
||||
};
|
||||
|
||||
/// External backup storage configuration, enough for creating a client for that storage.
|
||||
@@ -45,11 +45,11 @@ impl RemoteStorageKind {
|
||||
|
||||
impl RemoteStorageConfig {
|
||||
/// Helper to fetch the configured concurrency limit.
|
||||
pub fn concurrency_limit(&self) -> Option<usize> {
|
||||
pub fn concurrency_limit(&self) -> usize {
|
||||
match &self.storage {
|
||||
RemoteStorageKind::LocalFs { .. } => None,
|
||||
RemoteStorageKind::AwsS3(c) => Some(c.concurrency_limit.into()),
|
||||
RemoteStorageKind::AzureContainer(c) => Some(c.concurrency_limit.into()),
|
||||
RemoteStorageKind::LocalFs { .. } => DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT,
|
||||
RemoteStorageKind::AwsS3(c) => c.concurrency_limit.into(),
|
||||
RemoteStorageKind::AzureContainer(c) => c.concurrency_limit.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,6 +65,12 @@ pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
|
||||
/// Here, a limit of max 20k concurrent connections was noted.
|
||||
/// <https://learn.microsoft.com/en-us/answers/questions/1301863/is-there-any-limitation-to-concurrent-connections>
|
||||
pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 100;
|
||||
/// Set this limit analogously to the S3 limit.
|
||||
///
|
||||
/// The local filesystem backend doesn't enforce a concurrency limit itself, but this also bounds
|
||||
/// the upload queue concurrency. Some tests create thousands of uploads, which slows down the
|
||||
/// quadratic scheduling of the upload queue, and there is no point spawning so many Tokio tasks.
|
||||
pub const DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT: usize = 100;
|
||||
/// No limits on the client side, which currenltly means 1000 for AWS S3.
|
||||
/// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
|
||||
pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
|
||||
|
||||
@@ -32,6 +32,7 @@ use utils::id::TimelineId;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{PageContentKind, RequestContext};
|
||||
use crate::pgdatadir_mapping::DatadirModificationStats;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::layer_map::LayerMap;
|
||||
use crate::tenant::mgr::TenantSlot;
|
||||
@@ -2378,11 +2379,40 @@ pub(crate) struct WalIngestMetrics {
|
||||
pub(crate) records_observed: IntCounter,
|
||||
pub(crate) records_committed: IntCounter,
|
||||
pub(crate) records_filtered: IntCounter,
|
||||
pub(crate) values_committed_metadata_images: IntCounter,
|
||||
pub(crate) values_committed_metadata_deltas: IntCounter,
|
||||
pub(crate) values_committed_data_images: IntCounter,
|
||||
pub(crate) values_committed_data_deltas: IntCounter,
|
||||
pub(crate) gap_blocks_zeroed_on_rel_extend: IntCounter,
|
||||
pub(crate) clear_vm_bits_unknown: IntCounterVec,
|
||||
}
|
||||
|
||||
impl WalIngestMetrics {
|
||||
pub(crate) fn inc_values_committed(&self, stats: &DatadirModificationStats) {
|
||||
if stats.metadata_images > 0 {
|
||||
self.values_committed_metadata_images
|
||||
.inc_by(stats.metadata_images);
|
||||
}
|
||||
if stats.metadata_deltas > 0 {
|
||||
self.values_committed_metadata_deltas
|
||||
.inc_by(stats.metadata_deltas);
|
||||
}
|
||||
if stats.data_images > 0 {
|
||||
self.values_committed_data_images.inc_by(stats.data_images);
|
||||
}
|
||||
if stats.data_deltas > 0 {
|
||||
self.values_committed_data_deltas.inc_by(stats.data_deltas);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| {
|
||||
let values_committed = register_int_counter_vec!(
|
||||
"pageserver_wal_ingest_values_committed",
|
||||
"Number of values committed to pageserver storage from WAL records",
|
||||
&["class", "kind"],
|
||||
)
|
||||
.expect("failed to define a metric");
|
||||
|
||||
WalIngestMetrics {
|
||||
bytes_received: register_int_counter!(
|
||||
"pageserver_wal_ingest_bytes_received",
|
||||
@@ -2409,17 +2439,15 @@ pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| {
|
||||
"Number of WAL records filtered out due to sharding"
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
values_committed_metadata_images: values_committed.with_label_values(&["metadata", "image"]),
|
||||
values_committed_metadata_deltas: values_committed.with_label_values(&["metadata", "delta"]),
|
||||
values_committed_data_images: values_committed.with_label_values(&["data", "image"]),
|
||||
values_committed_data_deltas: values_committed.with_label_values(&["data", "delta"]),
|
||||
gap_blocks_zeroed_on_rel_extend: register_int_counter!(
|
||||
"pageserver_gap_blocks_zeroed_on_rel_extend",
|
||||
"Total number of zero gap blocks written on relation extends"
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
clear_vm_bits_unknown: register_int_counter_vec!(
|
||||
"pageserver_wal_ingest_clear_vm_bits_unknown",
|
||||
"Number of ignored ClearVmBits operations due to unknown pages/relations",
|
||||
&["entity"],
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -1280,8 +1280,6 @@ impl PageServerHandler {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
// and log the info! line inside the request span
|
||||
.instrument(span.clone())
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
@@ -2037,6 +2035,12 @@ impl PageServerHandler {
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.await?;
|
||||
|
||||
if timeline.is_archived() == Some(true) {
|
||||
// TODO after a grace period, turn this log line into a hard error
|
||||
tracing::warn!("timeline {tenant_id}/{timeline_id} is archived, but got basebackup request for it.");
|
||||
//return Err(QueryError::NotFound("timeline is archived".into()))
|
||||
}
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
if let Some(lsn) = lsn {
|
||||
// Backup was requested at a particular LSN. Wait for it to arrive.
|
||||
|
||||
@@ -48,7 +48,7 @@ use tracing::{debug, trace, warn};
|
||||
use utils::bin_ser::DeserializeError;
|
||||
use utils::pausable_failpoint;
|
||||
use utils::{bin_ser::BeSer, lsn::Lsn};
|
||||
use wal_decoder::serialized_batch::SerializedValueBatch;
|
||||
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
|
||||
|
||||
/// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
|
||||
pub const MAX_AUX_FILE_DELTAS: usize = 1024;
|
||||
@@ -1297,6 +1297,26 @@ impl DatadirModification<'_> {
|
||||
.is_some_and(|b| b.has_data())
|
||||
}
|
||||
|
||||
/// Returns statistics about the currently pending modifications.
|
||||
pub(crate) fn stats(&self) -> DatadirModificationStats {
|
||||
let mut stats = DatadirModificationStats::default();
|
||||
for (_, _, value) in self.pending_metadata_pages.values().flatten() {
|
||||
match value {
|
||||
Value::Image(_) => stats.metadata_images += 1,
|
||||
Value::WalRecord(r) if r.will_init() => stats.metadata_images += 1,
|
||||
Value::WalRecord(_) => stats.metadata_deltas += 1,
|
||||
}
|
||||
}
|
||||
for valuemeta in self.pending_data_batch.iter().flat_map(|b| &b.metadata) {
|
||||
match valuemeta {
|
||||
ValueMeta::Serialized(s) if s.will_init => stats.data_images += 1,
|
||||
ValueMeta::Serialized(_) => stats.data_deltas += 1,
|
||||
ValueMeta::Observed(_) => {}
|
||||
}
|
||||
}
|
||||
stats
|
||||
}
|
||||
|
||||
/// Set the current lsn
|
||||
pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
|
||||
ensure!(
|
||||
@@ -2317,6 +2337,15 @@ impl DatadirModification<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Statistics for a DatadirModification.
|
||||
#[derive(Default)]
|
||||
pub struct DatadirModificationStats {
|
||||
pub metadata_images: u64,
|
||||
pub metadata_deltas: u64,
|
||||
pub data_images: u64,
|
||||
pub data_deltas: u64,
|
||||
}
|
||||
|
||||
/// This struct facilitates accessing either a committed key from the timeline at a
|
||||
/// specific LSN, or the latest uncommitted key from a pending modification.
|
||||
///
|
||||
|
||||
@@ -437,8 +437,7 @@ impl RemoteTimelineClient {
|
||||
.conf
|
||||
.remote_storage_config
|
||||
.as_ref()
|
||||
.and_then(|r| r.concurrency_limit())
|
||||
.unwrap_or(0);
|
||||
.map_or(0, |r| r.concurrency_limit());
|
||||
let mut upload_queue = self.upload_queue.lock().unwrap();
|
||||
upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;
|
||||
self.update_remote_physical_size_gauge(Some(index_part));
|
||||
@@ -461,8 +460,7 @@ impl RemoteTimelineClient {
|
||||
.conf
|
||||
.remote_storage_config
|
||||
.as_ref()
|
||||
.and_then(|r| r.concurrency_limit())
|
||||
.unwrap_or(0);
|
||||
.map_or(0, |r| r.concurrency_limit());
|
||||
let mut upload_queue = self.upload_queue.lock().unwrap();
|
||||
upload_queue.initialize_empty_remote(local_metadata, inprogress_limit)?;
|
||||
self.update_remote_physical_size_gauge(None);
|
||||
@@ -484,8 +482,7 @@ impl RemoteTimelineClient {
|
||||
.conf
|
||||
.remote_storage_config
|
||||
.as_ref()
|
||||
.and_then(|r| r.concurrency_limit())
|
||||
.unwrap_or(0);
|
||||
.map_or(0, |r| r.concurrency_limit());
|
||||
|
||||
let mut upload_queue = self.upload_queue.lock().unwrap();
|
||||
upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;
|
||||
|
||||
@@ -9,13 +9,14 @@ use crate::{
|
||||
metrics::SECONDARY_MODE,
|
||||
tenant::{
|
||||
config::AttachmentMode,
|
||||
mgr::GetTenantError,
|
||||
mgr::TenantManager,
|
||||
mgr::{GetTenantError, TenantManager},
|
||||
remote_timeline_client::remote_heatmap_path,
|
||||
span::debug_assert_current_span_has_tenant_id,
|
||||
tasks::{warn_when_period_overrun, BackgroundLoopKind},
|
||||
Tenant,
|
||||
},
|
||||
virtual_file::VirtualFile,
|
||||
TEMP_FILE_SUFFIX,
|
||||
};
|
||||
|
||||
use futures::Future;
|
||||
@@ -32,7 +33,10 @@ use super::{
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info_span, instrument, Instrument};
|
||||
use utils::{backoff, completion::Barrier, yielding_loop::yielding_loop};
|
||||
use utils::{
|
||||
backoff, completion::Barrier, crashsafe::path_with_suffix_extension,
|
||||
yielding_loop::yielding_loop,
|
||||
};
|
||||
|
||||
pub(super) async fn heatmap_uploader_task(
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
@@ -461,6 +465,18 @@ async fn upload_tenant_heatmap(
|
||||
}
|
||||
}
|
||||
|
||||
// After a successful upload persist the fresh heatmap to disk.
|
||||
// When restarting, the tenant will read the heatmap from disk
|
||||
// and additively generate a new heatmap (see [`Timeline::generate_heatmap`]).
|
||||
// If the heatmap is stale, the additive generation can lead to keeping previously
|
||||
// evicted timelines on the secondarie's disk.
|
||||
let tenant_shard_id = tenant.get_tenant_shard_id();
|
||||
let heatmap_path = tenant.conf.tenant_heatmap_path(tenant_shard_id);
|
||||
let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
|
||||
if let Err(err) = VirtualFile::crashsafe_overwrite(heatmap_path, temp_path, bytes).await {
|
||||
tracing::warn!("Non fatal IO error writing to disk after heatmap upload: {err}");
|
||||
}
|
||||
|
||||
tracing::info!("Successfully uploaded {size} byte heatmap to {path}");
|
||||
|
||||
Ok(UploadHeatmapOutcome::Uploaded(LastUploadState {
|
||||
|
||||
@@ -211,7 +211,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
error_run_count = 0;
|
||||
// schedule the next compaction immediately in case there is a pending compaction task
|
||||
sleep_duration = if let CompactionOutcome::Pending = outcome {
|
||||
Duration::ZERO
|
||||
Duration::from_secs(1)
|
||||
} else {
|
||||
period
|
||||
};
|
||||
|
||||
@@ -192,7 +192,12 @@ pub enum ImageLayerCreationMode {
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub enum LastImageLayerCreationStatus {
|
||||
Incomplete, // TODO: record the last key being processed
|
||||
Incomplete {
|
||||
/// The last key of the partition (exclusive) that was processed in the last
|
||||
/// image layer creation attempt. We will continue from this key in the next
|
||||
/// attempt.
|
||||
last_key: Key,
|
||||
},
|
||||
Complete,
|
||||
#[default]
|
||||
Initial,
|
||||
@@ -4346,7 +4351,7 @@ impl Timeline {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
// Is it time to create a new image layer for the given partition?
|
||||
// Is it time to create a new image layer for the given partition? True if we want to generate.
|
||||
async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
|
||||
let threshold = self.get_image_creation_threshold();
|
||||
|
||||
@@ -4658,6 +4663,11 @@ impl Timeline {
|
||||
) -> Result<(Vec<ResidentLayer>, LastImageLayerCreationStatus), CreateImageLayersError> {
|
||||
let timer = self.metrics.create_images_time_histo.start_timer();
|
||||
|
||||
if partitioning.parts.is_empty() {
|
||||
warn!("no partitions to create image layers for");
|
||||
return Ok((vec![], LastImageLayerCreationStatus::Complete));
|
||||
}
|
||||
|
||||
// We need to avoid holes between generated image layers.
|
||||
// Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
|
||||
// image layer with hole between them. In this case such layer can not be utilized by GC.
|
||||
@@ -4669,28 +4679,65 @@ impl Timeline {
|
||||
// image layers <100000000..100000099> and <200000000..200000199> are not completely covering it.
|
||||
let mut start = Key::MIN;
|
||||
|
||||
let check_for_image_layers = if let LastImageLayerCreationStatus::Incomplete = last_status {
|
||||
info!(
|
||||
"resuming image layer creation: last_status={:?}",
|
||||
last_status
|
||||
);
|
||||
true
|
||||
} else {
|
||||
self.should_check_if_image_layers_required(lsn)
|
||||
};
|
||||
let check_for_image_layers =
|
||||
if let LastImageLayerCreationStatus::Incomplete { last_key } = last_status {
|
||||
info!(
|
||||
"resuming image layer creation: last_status=incomplete, continue from {}",
|
||||
last_key
|
||||
);
|
||||
true
|
||||
} else {
|
||||
self.should_check_if_image_layers_required(lsn)
|
||||
};
|
||||
|
||||
let mut batch_image_writer = BatchLayerWriter::new(self.conf).await?;
|
||||
|
||||
let mut all_generated = true;
|
||||
|
||||
let mut partition_processed = 0;
|
||||
let total_partitions = partitioning.parts.len();
|
||||
let mut total_partitions = partitioning.parts.len();
|
||||
let mut last_partition_processed = None;
|
||||
let mut partition_parts = partitioning.parts.clone();
|
||||
|
||||
for partition in partitioning.parts.iter() {
|
||||
if let LastImageLayerCreationStatus::Incomplete { last_key } = last_status {
|
||||
// We need to skip the partitions that have already been processed.
|
||||
let mut found = false;
|
||||
for (i, partition) in partition_parts.iter().enumerate() {
|
||||
if last_key <= partition.end().unwrap() {
|
||||
// ```plain
|
||||
// |------|--------|----------|------|
|
||||
// ^last_key
|
||||
// ^start from this partition
|
||||
// ```
|
||||
// Why `i+1` instead of `i`?
|
||||
// It is possible that the user did some writes after the previous image layer creation attempt so that
|
||||
// a relation grows in size, and the last_key is now in the middle of the partition. In this case, we
|
||||
// still want to skip this partition, so that we can make progress and avoid generating image layers over
|
||||
// the same partition. Doing a mod to ensure we don't end up with an empty vec.
|
||||
if i + 1 >= total_partitions {
|
||||
// In general, this case should not happen -- if last_key is on the last partition, the previous
|
||||
// iteration of image layer creation should return a complete status.
|
||||
break; // with found=false
|
||||
}
|
||||
partition_parts = partition_parts.split_off(i + 1); // Remove the first i + 1 elements
|
||||
total_partitions = partition_parts.len();
|
||||
// Update the start key to the partition start.
|
||||
start = partition_parts[0].start().unwrap();
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
// Last key is within the last partition, or larger than all partitions.
|
||||
return Ok((vec![], LastImageLayerCreationStatus::Complete));
|
||||
}
|
||||
}
|
||||
|
||||
for partition in partition_parts.iter() {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CreateImageLayersError::Cancelled);
|
||||
}
|
||||
|
||||
partition_processed += 1;
|
||||
let img_range = start..partition.ranges.last().unwrap().end;
|
||||
let compact_metadata = partition.overlaps(&Key::metadata_key_range());
|
||||
if compact_metadata {
|
||||
@@ -4725,6 +4772,8 @@ impl Timeline {
|
||||
lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
|
||||
is_delta: false,
|
||||
}) {
|
||||
// TODO: this can be processed with the BatchLayerWriter::finish_with_discard
|
||||
// in the future.
|
||||
tracing::info!(
|
||||
"Skipping image layer at {lsn} {}..{}, already exists",
|
||||
img_range.start,
|
||||
@@ -4805,8 +4854,6 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
partition_processed += 1;
|
||||
|
||||
if let ImageLayerCreationMode::Try = mode {
|
||||
// We have at least made some progress
|
||||
if batch_image_writer.pending_layer_num() >= 1 {
|
||||
@@ -4822,8 +4869,10 @@ impl Timeline {
|
||||
* self.get_compaction_threshold();
|
||||
if image_preempt_threshold != 0 && num_of_l0_layers >= image_preempt_threshold {
|
||||
tracing::info!(
|
||||
"preempt image layer generation at {start} at {lsn}: too many L0 layers {num_of_l0_layers}",
|
||||
"preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers {}",
|
||||
partition.start().unwrap(), partition.end().unwrap(), num_of_l0_layers
|
||||
);
|
||||
last_partition_processed = Some(partition.clone());
|
||||
all_generated = false;
|
||||
break;
|
||||
}
|
||||
@@ -4868,7 +4917,14 @@ impl Timeline {
|
||||
if all_generated {
|
||||
LastImageLayerCreationStatus::Complete
|
||||
} else {
|
||||
LastImageLayerCreationStatus::Incomplete
|
||||
LastImageLayerCreationStatus::Incomplete {
|
||||
last_key: if let Some(last_partition_processed) = last_partition_processed {
|
||||
last_partition_processed.end().unwrap_or(Key::MIN)
|
||||
} else {
|
||||
// This branch should be unreachable, but in case it happens, we can just return the start key.
|
||||
Key::MIN
|
||||
},
|
||||
}
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ use crate::page_cache;
|
||||
use crate::statvfs::Statvfs;
|
||||
use crate::tenant::checks::check_valid_layermap;
|
||||
use crate::tenant::gc_block::GcBlock;
|
||||
use crate::tenant::layer_map::LayerMap;
|
||||
use crate::tenant::remote_timeline_client::WaitCompletionError;
|
||||
use crate::tenant::storage_layer::batch_split_writer::{
|
||||
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
|
||||
@@ -438,6 +439,11 @@ impl KeyHistoryRetention {
|
||||
if dry_run {
|
||||
return true;
|
||||
}
|
||||
if LayerMap::is_l0(&key.key_range, key.is_delta) {
|
||||
// gc-compaction should not produce L0 deltas, otherwise it will break the layer order.
|
||||
// We should ignore such layers.
|
||||
return true;
|
||||
}
|
||||
let layer_generation;
|
||||
{
|
||||
let guard = tline.layers.read().await;
|
||||
@@ -748,7 +754,7 @@ impl Timeline {
|
||||
.store(Arc::new(outcome.clone()));
|
||||
|
||||
self.upload_new_image_layers(image_layers)?;
|
||||
if let LastImageLayerCreationStatus::Incomplete = outcome {
|
||||
if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
|
||||
// Yield and do not do any other kind of compaction.
|
||||
info!("skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction).");
|
||||
return Ok(CompactionOutcome::Pending);
|
||||
|
||||
@@ -355,6 +355,19 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
// advances it to its end LSN. 0 is just an initialization placeholder.
|
||||
let mut modification = timeline.begin_modification(Lsn(0));
|
||||
|
||||
async fn commit(
|
||||
modification: &mut DatadirModification<'_>,
|
||||
ctx: &RequestContext,
|
||||
uncommitted: &mut u64,
|
||||
) -> anyhow::Result<()> {
|
||||
let stats = modification.stats();
|
||||
modification.commit(ctx).await?;
|
||||
WAL_INGEST.records_committed.inc_by(*uncommitted);
|
||||
WAL_INGEST.inc_values_committed(&stats);
|
||||
*uncommitted = 0;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
if !records.is_empty() {
|
||||
timeline
|
||||
.metrics
|
||||
@@ -366,8 +379,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
|
||||
&& uncommitted_records > 0
|
||||
{
|
||||
modification.commit(&ctx).await?;
|
||||
uncommitted_records = 0;
|
||||
commit(&mut modification, &ctx, &mut uncommitted_records).await?;
|
||||
}
|
||||
|
||||
let local_next_record_lsn = interpreted.next_record_lsn;
|
||||
@@ -396,8 +408,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
|| modification.approx_pending_bytes()
|
||||
> DatadirModification::MAX_PENDING_BYTES
|
||||
{
|
||||
modification.commit(&ctx).await?;
|
||||
uncommitted_records = 0;
|
||||
commit(&mut modification, &ctx, &mut uncommitted_records).await?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -415,7 +426,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
|
||||
if uncommitted_records > 0 || needs_last_record_lsn_advance {
|
||||
// Commit any uncommitted records
|
||||
modification.commit(&ctx).await?;
|
||||
commit(&mut modification, &ctx, &mut uncommitted_records).await?;
|
||||
}
|
||||
|
||||
if !caught_up && streaming_lsn >= end_of_wal {
|
||||
@@ -442,10 +453,12 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
filtered: &mut u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let stats = modification.stats();
|
||||
modification.commit(ctx).await?;
|
||||
WAL_INGEST
|
||||
.records_committed
|
||||
.inc_by(*uncommitted - *filtered);
|
||||
modification.commit(ctx).await?;
|
||||
WAL_INGEST.inc_values_committed(&stats);
|
||||
*uncommitted = 0;
|
||||
*filtered = 0;
|
||||
Ok(())
|
||||
|
||||
@@ -28,17 +28,9 @@ use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_ffi::fsm_logical_to_physical;
|
||||
use postgres_ffi::walrecord::*;
|
||||
use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
|
||||
use wal_decoder::models::*;
|
||||
|
||||
use anyhow::{bail, Result};
|
||||
use bytes::{Buf, Bytes};
|
||||
use tracing::*;
|
||||
use utils::failpoint_support;
|
||||
use utils::rate_limit::RateLimit;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::metrics::WAL_INGEST;
|
||||
@@ -50,11 +42,18 @@ use crate::ZERO_PAGE;
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_ffi::fsm_logical_to_physical;
|
||||
use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::walrecord::*;
|
||||
use postgres_ffi::TransactionId;
|
||||
use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
|
||||
use utils::bin_ser::SerializeError;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::rate_limit::RateLimit;
|
||||
use utils::{critical, failpoint_support};
|
||||
use wal_decoder::models::*;
|
||||
|
||||
enum_pgversion! {CheckPoint, pgv::CheckPoint}
|
||||
|
||||
@@ -327,93 +326,75 @@ impl WalIngest {
|
||||
let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
|
||||
let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
|
||||
|
||||
// Sometimes, Postgres seems to create heap WAL records with the
|
||||
// ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
|
||||
// not set. In fact, it's possible that the VM page does not exist at all.
|
||||
// In that case, we don't want to store a record to clear the VM bit;
|
||||
// replaying it would fail to find the previous image of the page, because
|
||||
// it doesn't exist. So check if the VM page(s) exist, and skip the WAL
|
||||
// record if it doesn't.
|
||||
//
|
||||
// TODO: analyze the metrics and tighten this up accordingly. This logic
|
||||
// implicitly assumes that VM pages see explicit WAL writes before
|
||||
// implicit ClearVmBits, and will otherwise silently drop updates.
|
||||
// VM bits can only be cleared on the shard(s) owning the VM relation, and must be within
|
||||
// its view of the VM relation size. Out of caution, error instead of failing WAL ingestion,
|
||||
// as there has historically been cases where PostgreSQL has cleared spurious VM pages. See:
|
||||
// https://github.com/neondatabase/neon/pull/10634.
|
||||
let Some(vm_size) = get_relsize(modification, vm_rel, ctx).await? else {
|
||||
WAL_INGEST
|
||||
.clear_vm_bits_unknown
|
||||
.with_label_values(&["relation"])
|
||||
.inc();
|
||||
critical!("clear_vm_bits for unknown VM relation {vm_rel}");
|
||||
return Ok(());
|
||||
};
|
||||
if let Some(blknum) = new_vm_blk {
|
||||
if blknum >= vm_size {
|
||||
WAL_INGEST
|
||||
.clear_vm_bits_unknown
|
||||
.with_label_values(&["new_page"])
|
||||
.inc();
|
||||
critical!("new_vm_blk {blknum} not in {vm_rel} of size {vm_size}");
|
||||
new_vm_blk = None;
|
||||
}
|
||||
}
|
||||
if let Some(blknum) = old_vm_blk {
|
||||
if blknum >= vm_size {
|
||||
WAL_INGEST
|
||||
.clear_vm_bits_unknown
|
||||
.with_label_values(&["old_page"])
|
||||
.inc();
|
||||
critical!("old_vm_blk {blknum} not in {vm_rel} of size {vm_size}");
|
||||
old_vm_blk = None;
|
||||
}
|
||||
}
|
||||
|
||||
if new_vm_blk.is_some() || old_vm_blk.is_some() {
|
||||
if new_vm_blk == old_vm_blk {
|
||||
// An UPDATE record that needs to clear the bits for both old and the
|
||||
// new page, both of which reside on the same VM page.
|
||||
if new_vm_blk.is_none() && old_vm_blk.is_none() {
|
||||
return Ok(());
|
||||
} else if new_vm_blk == old_vm_blk {
|
||||
// An UPDATE record that needs to clear the bits for both old and the new page, both of
|
||||
// which reside on the same VM page.
|
||||
self.put_rel_wal_record(
|
||||
modification,
|
||||
vm_rel,
|
||||
new_vm_blk.unwrap(),
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
new_heap_blkno,
|
||||
old_heap_blkno,
|
||||
flags,
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
// Clear VM bits for one heap page, or for two pages that reside on different VM pages.
|
||||
if let Some(new_vm_blk) = new_vm_blk {
|
||||
self.put_rel_wal_record(
|
||||
modification,
|
||||
vm_rel,
|
||||
new_vm_blk.unwrap(),
|
||||
new_vm_blk,
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
new_heap_blkno,
|
||||
old_heap_blkno: None,
|
||||
flags,
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
if let Some(old_vm_blk) = old_vm_blk {
|
||||
self.put_rel_wal_record(
|
||||
modification,
|
||||
vm_rel,
|
||||
old_vm_blk,
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
new_heap_blkno: None,
|
||||
old_heap_blkno,
|
||||
flags,
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
// Clear VM bits for one heap page, or for two pages that reside on
|
||||
// different VM pages.
|
||||
if let Some(new_vm_blk) = new_vm_blk {
|
||||
self.put_rel_wal_record(
|
||||
modification,
|
||||
vm_rel,
|
||||
new_vm_blk,
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
new_heap_blkno,
|
||||
old_heap_blkno: None,
|
||||
flags,
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
if let Some(old_vm_blk) = old_vm_blk {
|
||||
self.put_rel_wal_record(
|
||||
modification,
|
||||
vm_rel,
|
||||
old_vm_blk,
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
new_heap_blkno: None,
|
||||
old_heap_blkno,
|
||||
flags,
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -509,47 +509,44 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
|
||||
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
|
||||
|
||||
tag.blockNum = (blkno + i) & ~(BLOCKS_PER_CHUNK - 1);
|
||||
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
chunk_offs = (blkno + i) & (BLOCKS_PER_CHUNK - 1);
|
||||
chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_SHARED);
|
||||
|
||||
if (!LFC_ENABLED())
|
||||
{
|
||||
LWLockRelease(lfc_lock);
|
||||
return 0;
|
||||
}
|
||||
while (true)
|
||||
{
|
||||
int this_chunk = Min(nblocks, BLOCKS_PER_CHUNK - chunk_offs);
|
||||
if (LFC_ENABLED())
|
||||
{
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||
int this_chunk = Min(nblocks - i, BLOCKS_PER_CHUNK - chunk_offs);
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||
|
||||
if (entry != NULL)
|
||||
if (entry != NULL)
|
||||
{
|
||||
for (; chunk_offs < BLOCKS_PER_CHUNK && i < nblocks; chunk_offs++, i++)
|
||||
{
|
||||
for (; chunk_offs < BLOCKS_PER_CHUNK && i < nblocks; chunk_offs++, i++)
|
||||
if ((entry->bitmap[chunk_offs >> 5] &
|
||||
((uint32)1 << (chunk_offs & 31))) != 0)
|
||||
{
|
||||
if ((entry->bitmap[chunk_offs >> 5] &
|
||||
((uint32)1 << (chunk_offs & 31))) != 0)
|
||||
{
|
||||
BITMAP_SET(bitmap, i);
|
||||
found++;
|
||||
}
|
||||
BITMAP_SET(bitmap, i);
|
||||
found++;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
i += this_chunk;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
LWLockRelease(lfc_lock);
|
||||
return found;
|
||||
i += this_chunk;
|
||||
}
|
||||
|
||||
/*
|
||||
* Break out of the iteration before doing expensive stuff for
|
||||
* a next iteration
|
||||
*/
|
||||
if (i + 1 >= nblocks)
|
||||
if (i >= nblocks)
|
||||
break;
|
||||
|
||||
/*
|
||||
@@ -563,8 +560,8 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
#if USE_ASSERT_CHECKING
|
||||
do {
|
||||
#ifdef USE_ASSERT_CHECKING
|
||||
{
|
||||
int count = 0;
|
||||
|
||||
for (int j = 0; j < nblocks; j++)
|
||||
@@ -574,7 +571,7 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
}
|
||||
|
||||
Assert(count == found);
|
||||
} while (false);
|
||||
}
|
||||
#endif
|
||||
|
||||
return found;
|
||||
|
||||
@@ -36,6 +36,11 @@
|
||||
#include "pagestore_client.h"
|
||||
#include "walproposer.h"
|
||||
|
||||
#ifdef __linux__
|
||||
#include <sys/ioctl.h>
|
||||
#include <linux/sockios.h>
|
||||
#endif
|
||||
|
||||
#define PageStoreTrace DEBUG5
|
||||
|
||||
#define MIN_RECONNECT_INTERVAL_USEC 1000
|
||||
@@ -728,11 +733,36 @@ retry:
|
||||
INSTR_TIME_SUBTRACT(since_last_log, last_log_ts);
|
||||
if (INSTR_TIME_GET_MILLISEC(since_last_log) >= LOG_INTERVAL_MS)
|
||||
{
|
||||
int sndbuf = -1;
|
||||
int recvbuf = -1;
|
||||
#ifdef __linux__
|
||||
int socketfd;
|
||||
#endif
|
||||
|
||||
since_start = now;
|
||||
INSTR_TIME_SUBTRACT(since_start, start_ts);
|
||||
neon_shard_log(shard_no, LOG, "no response received from pageserver for %0.3f s, still waiting (sent " UINT64_FORMAT " requests, received " UINT64_FORMAT " responses)",
|
||||
|
||||
#ifdef __linux__
|
||||
/*
|
||||
* get kernel's send and recv queue size via ioctl
|
||||
* https://elixir.bootlin.com/linux/v6.1.128/source/include/uapi/linux/sockios.h#L25-L27
|
||||
*/
|
||||
socketfd = PQsocket(pageserver_conn);
|
||||
if (socketfd != -1) {
|
||||
int ioctl_err;
|
||||
ioctl_err = ioctl(socketfd, SIOCOUTQ, &sndbuf);
|
||||
if (ioctl_err!= 0) {
|
||||
sndbuf = -errno;
|
||||
}
|
||||
ioctl_err = ioctl(socketfd, FIONREAD, &recvbuf);
|
||||
if (ioctl_err != 0) {
|
||||
recvbuf = -errno;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
neon_shard_log(shard_no, LOG, "no response received from pageserver for %0.3f s, still waiting (sent " UINT64_FORMAT " requests, received " UINT64_FORMAT " responses) (socket sndbuf=%d recvbuf=%d)",
|
||||
INSTR_TIME_GET_DOUBLE(since_start),
|
||||
shard->nrequests_sent, shard->nresponses_received);
|
||||
shard->nrequests_sent, shard->nresponses_received, sndbuf, recvbuf);
|
||||
last_log_ts = now;
|
||||
logged = true;
|
||||
}
|
||||
|
||||
@@ -916,7 +916,7 @@ prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
|
||||
{
|
||||
uint64 min_ring_index;
|
||||
PrefetchRequest hashkey;
|
||||
#if USE_ASSERT_CHECKING
|
||||
#ifdef USE_ASSERT_CHECKING
|
||||
bool any_hits = false;
|
||||
#endif
|
||||
/* We will never read further ahead than our buffer can store. */
|
||||
@@ -955,7 +955,7 @@ Retry:
|
||||
else
|
||||
lsns = NULL;
|
||||
|
||||
#if USE_ASSERT_CHECKING
|
||||
#ifdef USE_ASSERT_CHECKING
|
||||
any_hits = true;
|
||||
#endif
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ aws-config.workspace = true
|
||||
aws-sdk-iam.workspace = true
|
||||
aws-sigv4.workspace = true
|
||||
base64.workspace = true
|
||||
boxcar = "0.2.8"
|
||||
bstr.workspace = true
|
||||
bytes = { workspace = true, features = ["serde"] }
|
||||
camino.workspace = true
|
||||
@@ -42,6 +43,7 @@ hyper0.workspace = true
|
||||
hyper = { workspace = true, features = ["server", "http1", "http2"] }
|
||||
hyper-util = { version = "0.1", features = ["server", "http1", "http2", "tokio"] }
|
||||
http-body-util = { version = "0.1" }
|
||||
gettid = "0.1.3"
|
||||
indexmap = { workspace = true, features = ["serde"] }
|
||||
ipnet.workspace = true
|
||||
itertools.workspace = true
|
||||
@@ -50,6 +52,8 @@ lasso = { workspace = true, features = ["multi-threaded"] }
|
||||
measured = { workspace = true, features = ["lasso"] }
|
||||
metrics.workspace = true
|
||||
once_cell.workspace = true
|
||||
opentelemetry = { workspace = true, features = ["trace"] }
|
||||
papaya = "0.1.8"
|
||||
parking_lot.workspace = true
|
||||
parquet.workspace = true
|
||||
parquet_derive.workspace = true
|
||||
@@ -89,6 +93,9 @@ tokio = { workspace = true, features = ["signal"] }
|
||||
tracing-subscriber.workspace = true
|
||||
tracing-utils.workspace = true
|
||||
tracing.workspace = true
|
||||
tracing-log.workspace = true
|
||||
tracing-serde.workspace = true
|
||||
tracing-opentelemetry.workspace = true
|
||||
try-lock.workspace = true
|
||||
typed-json.workspace = true
|
||||
url.workspace = true
|
||||
@@ -112,6 +119,7 @@ rsa = "0.9"
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
assert-json-diff.workspace = true
|
||||
camino-tempfile.workspace = true
|
||||
fallible-iterator.workspace = true
|
||||
flate2.workspace = true
|
||||
|
||||
@@ -1,10 +1,23 @@
|
||||
use tracing::Subscriber;
|
||||
use std::cell::{Cell, RefCell};
|
||||
use std::collections::HashMap;
|
||||
use std::hash::BuildHasher;
|
||||
use std::{env, io};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use opentelemetry::trace::TraceContextExt;
|
||||
use scopeguard::defer;
|
||||
use serde::ser::{SerializeMap, Serializer};
|
||||
use tracing::span;
|
||||
use tracing::subscriber::Interest;
|
||||
use tracing::{callsite, Event, Metadata, Span, Subscriber};
|
||||
use tracing_opentelemetry::OpenTelemetrySpanExt;
|
||||
use tracing_subscriber::filter::{EnvFilter, LevelFilter};
|
||||
use tracing_subscriber::fmt::format::{Format, Full};
|
||||
use tracing_subscriber::fmt::time::SystemTime;
|
||||
use tracing_subscriber::fmt::{FormatEvent, FormatFields};
|
||||
use tracing_subscriber::layer::{Context, Layer};
|
||||
use tracing_subscriber::prelude::*;
|
||||
use tracing_subscriber::registry::LookupSpan;
|
||||
use tracing_subscriber::registry::{LookupSpan, SpanRef};
|
||||
|
||||
/// Initialize logging and OpenTelemetry tracing and exporter.
|
||||
///
|
||||
@@ -15,6 +28,8 @@ use tracing_subscriber::registry::LookupSpan;
|
||||
/// destination, set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`.
|
||||
/// See <https://opentelemetry.io/docs/reference/specification/sdk-environment-variables>
|
||||
pub async fn init() -> anyhow::Result<LoggingGuard> {
|
||||
let logfmt = LogFormat::from_env()?;
|
||||
|
||||
let env_filter = EnvFilter::builder()
|
||||
.with_default_directive(LevelFilter::INFO.into())
|
||||
.from_env_lossy()
|
||||
@@ -29,17 +44,36 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
|
||||
.expect("this should be a valid filter directive"),
|
||||
);
|
||||
|
||||
let fmt_layer = tracing_subscriber::fmt::layer()
|
||||
.with_ansi(false)
|
||||
.with_writer(std::io::stderr)
|
||||
.with_target(false);
|
||||
|
||||
let otlp_layer = tracing_utils::init_tracing("proxy").await;
|
||||
|
||||
let json_log_layer = if logfmt == LogFormat::Json {
|
||||
Some(JsonLoggingLayer {
|
||||
clock: RealClock,
|
||||
skipped_field_indices: papaya::HashMap::default(),
|
||||
writer: StderrWriter {
|
||||
stderr: std::io::stderr(),
|
||||
},
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let text_log_layer = if logfmt == LogFormat::Text {
|
||||
Some(
|
||||
tracing_subscriber::fmt::layer()
|
||||
.with_ansi(false)
|
||||
.with_writer(std::io::stderr)
|
||||
.with_target(false),
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(env_filter)
|
||||
.with(otlp_layer)
|
||||
.with(fmt_layer)
|
||||
.with(json_log_layer)
|
||||
.with(text_log_layer)
|
||||
.try_init()?;
|
||||
|
||||
Ok(LoggingGuard)
|
||||
@@ -94,3 +128,857 @@ impl Drop for LoggingGuard {
|
||||
tracing_utils::shutdown_tracing();
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: make JSON the default
|
||||
#[derive(Copy, Clone, PartialEq, Eq, Default, Debug)]
|
||||
enum LogFormat {
|
||||
#[default]
|
||||
Text = 1,
|
||||
Json,
|
||||
}
|
||||
|
||||
impl LogFormat {
|
||||
fn from_env() -> anyhow::Result<Self> {
|
||||
let logfmt = env::var("LOGFMT");
|
||||
Ok(match logfmt.as_deref() {
|
||||
Err(_) => LogFormat::default(),
|
||||
Ok("text") => LogFormat::Text,
|
||||
Ok("json") => LogFormat::Json,
|
||||
Ok(logfmt) => anyhow::bail!("unknown log format: {logfmt}"),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
trait MakeWriter {
|
||||
fn make_writer(&self) -> impl io::Write;
|
||||
}
|
||||
|
||||
struct StderrWriter {
|
||||
stderr: io::Stderr,
|
||||
}
|
||||
|
||||
impl MakeWriter for StderrWriter {
|
||||
#[inline]
|
||||
fn make_writer(&self) -> impl io::Write {
|
||||
self.stderr.lock()
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: move into separate module or even separate crate.
|
||||
trait Clock {
|
||||
fn now(&self) -> DateTime<Utc>;
|
||||
}
|
||||
|
||||
struct RealClock;
|
||||
|
||||
impl Clock for RealClock {
|
||||
#[inline]
|
||||
fn now(&self) -> DateTime<Utc> {
|
||||
Utc::now()
|
||||
}
|
||||
}
|
||||
|
||||
/// Name of the field used by tracing crate to store the event message.
|
||||
const MESSAGE_FIELD: &str = "message";
|
||||
|
||||
thread_local! {
|
||||
/// Protects against deadlocks and double panics during log writing.
|
||||
/// The current panic handler will use tracing to log panic information.
|
||||
static REENTRANCY_GUARD: Cell<bool> = const { Cell::new(false) };
|
||||
/// Thread-local instance with per-thread buffer for log writing.
|
||||
static EVENT_FORMATTER: RefCell<EventFormatter> = RefCell::new(EventFormatter::new());
|
||||
/// Cached OS thread ID.
|
||||
static THREAD_ID: u64 = gettid::gettid();
|
||||
}
|
||||
|
||||
/// Implements tracing layer to handle events specific to logging.
|
||||
struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
|
||||
clock: C,
|
||||
skipped_field_indices: papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
|
||||
writer: W,
|
||||
}
|
||||
|
||||
impl<S, C: Clock + 'static, W: MakeWriter + 'static> Layer<S> for JsonLoggingLayer<C, W>
|
||||
where
|
||||
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||
{
|
||||
fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
|
||||
use std::io::Write;
|
||||
|
||||
// TODO: consider special tracing subscriber to grab timestamp very
|
||||
// early, before OTel machinery, and add as event extension.
|
||||
let now = self.clock.now();
|
||||
|
||||
let res: io::Result<()> = REENTRANCY_GUARD.with(move |entered| {
|
||||
if entered.get() {
|
||||
let mut formatter = EventFormatter::new();
|
||||
formatter.format(now, event, &ctx, &self.skipped_field_indices)?;
|
||||
self.writer.make_writer().write_all(formatter.buffer())
|
||||
} else {
|
||||
entered.set(true);
|
||||
defer!(entered.set(false););
|
||||
|
||||
EVENT_FORMATTER.with_borrow_mut(move |formatter| {
|
||||
formatter.reset();
|
||||
formatter.format(now, event, &ctx, &self.skipped_field_indices)?;
|
||||
self.writer.make_writer().write_all(formatter.buffer())
|
||||
})
|
||||
}
|
||||
});
|
||||
|
||||
// In case logging fails we generate a simpler JSON object.
|
||||
if let Err(err) = res {
|
||||
if let Ok(mut line) = serde_json::to_vec(&serde_json::json!( {
|
||||
"timestamp": now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
|
||||
"level": "ERROR",
|
||||
"message": format_args!("cannot log event: {err:?}"),
|
||||
"fields": {
|
||||
"event": format_args!("{event:?}"),
|
||||
},
|
||||
})) {
|
||||
line.push(b'\n');
|
||||
self.writer.make_writer().write_all(&line).ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Registers a SpanFields instance as span extension.
|
||||
fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
|
||||
let span = ctx.span(id).expect("span must exist");
|
||||
let fields = SpanFields::default();
|
||||
fields.record_fields(attrs);
|
||||
// This could deadlock when there's a panic somewhere in the tracing
|
||||
// event handling and a read or write guard is still held. This includes
|
||||
// the OTel subscriber.
|
||||
span.extensions_mut().insert(fields);
|
||||
}
|
||||
|
||||
fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
|
||||
let span = ctx.span(id).expect("span must exist");
|
||||
let ext = span.extensions();
|
||||
if let Some(data) = ext.get::<SpanFields>() {
|
||||
data.record_fields(values);
|
||||
}
|
||||
}
|
||||
|
||||
/// Called (lazily) whenever a new log call is executed. We quickly check
|
||||
/// for duplicate field names and record duplicates as skippable. Last one
|
||||
/// wins.
|
||||
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
|
||||
if !metadata.is_event() {
|
||||
// Must not be never because we wouldn't get trace and span data.
|
||||
return Interest::always();
|
||||
}
|
||||
|
||||
let mut field_indices = SkippedFieldIndices::default();
|
||||
let mut seen_fields = HashMap::<&'static str, usize>::new();
|
||||
for field in metadata.fields() {
|
||||
use std::collections::hash_map::Entry;
|
||||
match seen_fields.entry(field.name()) {
|
||||
Entry::Vacant(entry) => {
|
||||
// field not seen yet
|
||||
entry.insert(field.index());
|
||||
}
|
||||
Entry::Occupied(mut entry) => {
|
||||
// replace currently stored index
|
||||
let old_index = entry.insert(field.index());
|
||||
// ... and append it to list of skippable indices
|
||||
field_indices.push(old_index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !field_indices.is_empty() {
|
||||
self.skipped_field_indices
|
||||
.pin()
|
||||
.insert(metadata.callsite(), field_indices);
|
||||
}
|
||||
|
||||
Interest::always()
|
||||
}
|
||||
}
|
||||
|
||||
/// Stores span field values recorded during the spans lifetime.
|
||||
#[derive(Default)]
|
||||
struct SpanFields {
|
||||
// TODO: Switch to custom enum with lasso::Spur for Strings?
|
||||
fields: papaya::HashMap<&'static str, serde_json::Value>,
|
||||
}
|
||||
|
||||
impl SpanFields {
|
||||
#[inline]
|
||||
fn record_fields<R: tracing_subscriber::field::RecordFields>(&self, fields: R) {
|
||||
fields.record(&mut SpanFieldsRecorder {
|
||||
fields: self.fields.pin(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements a tracing field visitor to convert and store values.
|
||||
struct SpanFieldsRecorder<'m, S, G> {
|
||||
fields: papaya::HashMapRef<'m, &'static str, serde_json::Value, S, G>,
|
||||
}
|
||||
|
||||
impl<S: BuildHasher, G: papaya::Guard> tracing::field::Visit for SpanFieldsRecorder<'_, S, G> {
|
||||
#[inline]
|
||||
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
|
||||
if let Ok(value) = i64::try_from(value) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
} else {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(format!("{value}")));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
|
||||
if let Ok(value) = u64::try_from(value) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
} else {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(format!("{value}")));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(format!("{value:?}")));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_error(
|
||||
&mut self,
|
||||
field: &tracing::field::Field,
|
||||
value: &(dyn std::error::Error + 'static),
|
||||
) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(format!("{value}")));
|
||||
}
|
||||
}
|
||||
|
||||
/// List of field indices skipped during logging. Can list duplicate fields or
|
||||
/// metafields not meant to be logged.
|
||||
#[derive(Clone, Default)]
|
||||
struct SkippedFieldIndices {
|
||||
bits: u64,
|
||||
}
|
||||
|
||||
impl SkippedFieldIndices {
|
||||
#[inline]
|
||||
fn is_empty(&self) -> bool {
|
||||
self.bits == 0
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn push(&mut self, index: usize) {
|
||||
self.bits |= 1u64
|
||||
.checked_shl(index as u32)
|
||||
.expect("field index too large");
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn contains(&self, index: usize) -> bool {
|
||||
self.bits
|
||||
& 1u64
|
||||
.checked_shl(index as u32)
|
||||
.expect("field index too large")
|
||||
!= 0
|
||||
}
|
||||
}
|
||||
|
||||
/// Formats a tracing event and writes JSON to its internal buffer including a newline.
|
||||
// TODO: buffer capacity management, truncate if too large
|
||||
struct EventFormatter {
|
||||
logline_buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
impl EventFormatter {
|
||||
#[inline]
|
||||
fn new() -> Self {
|
||||
EventFormatter {
|
||||
logline_buffer: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn buffer(&self) -> &[u8] {
|
||||
&self.logline_buffer
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn reset(&mut self) {
|
||||
self.logline_buffer.clear();
|
||||
}
|
||||
|
||||
fn format<S>(
|
||||
&mut self,
|
||||
now: DateTime<Utc>,
|
||||
event: &Event<'_>,
|
||||
ctx: &Context<'_, S>,
|
||||
skipped_field_indices: &papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
|
||||
) -> io::Result<()>
|
||||
where
|
||||
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||
{
|
||||
let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
|
||||
|
||||
use tracing_log::NormalizeEvent;
|
||||
let normalized_meta = event.normalized_metadata();
|
||||
let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
|
||||
|
||||
let skipped_field_indices = skipped_field_indices.pin();
|
||||
let skipped_field_indices = skipped_field_indices.get(&meta.callsite());
|
||||
|
||||
let mut serialize = || {
|
||||
let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer);
|
||||
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
|
||||
// Timestamp comes first, so raw lines can be sorted by timestamp.
|
||||
serializer.serialize_entry("timestamp", ×tamp)?;
|
||||
|
||||
// Level next.
|
||||
serializer.serialize_entry("level", &meta.level().as_str())?;
|
||||
|
||||
// Message next.
|
||||
serializer.serialize_key("message")?;
|
||||
let mut message_extractor =
|
||||
MessageFieldExtractor::new(serializer, skipped_field_indices);
|
||||
event.record(&mut message_extractor);
|
||||
let mut serializer = message_extractor.into_serializer()?;
|
||||
|
||||
let mut fields_present = FieldsPresent(false, skipped_field_indices);
|
||||
event.record(&mut fields_present);
|
||||
if fields_present.0 {
|
||||
serializer.serialize_entry(
|
||||
"fields",
|
||||
&SerializableEventFields(event, skipped_field_indices),
|
||||
)?;
|
||||
}
|
||||
|
||||
let pid = std::process::id();
|
||||
if pid != 1 {
|
||||
serializer.serialize_entry("process_id", &pid)?;
|
||||
}
|
||||
|
||||
THREAD_ID.with(|tid| serializer.serialize_entry("thread_id", tid))?;
|
||||
|
||||
// TODO: tls cache? name could change
|
||||
if let Some(thread_name) = std::thread::current().name() {
|
||||
if !thread_name.is_empty() && thread_name != "tokio-runtime-worker" {
|
||||
serializer.serialize_entry("thread_name", thread_name)?;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(task_id) = tokio::task::try_id() {
|
||||
serializer.serialize_entry("task_id", &format_args!("{task_id}"))?;
|
||||
}
|
||||
|
||||
serializer.serialize_entry("target", meta.target())?;
|
||||
|
||||
if let Some(module) = meta.module_path() {
|
||||
if module != meta.target() {
|
||||
serializer.serialize_entry("module", module)?;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(file) = meta.file() {
|
||||
if let Some(line) = meta.line() {
|
||||
serializer.serialize_entry("src", &format_args!("{file}:{line}"))?;
|
||||
} else {
|
||||
serializer.serialize_entry("src", file)?;
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
let otel_context = Span::current().context();
|
||||
let otel_spanref = otel_context.span();
|
||||
let span_context = otel_spanref.span_context();
|
||||
if span_context.is_valid() {
|
||||
serializer.serialize_entry(
|
||||
"trace_id",
|
||||
&format_args!("{}", span_context.trace_id()),
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
serializer.serialize_entry("spans", &SerializableSpanStack(ctx))?;
|
||||
|
||||
serializer.end()
|
||||
};
|
||||
|
||||
serialize().map_err(io::Error::other)?;
|
||||
self.logline_buffer.push(b'\n');
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Extracts the message field that's mixed will other fields.
|
||||
struct MessageFieldExtractor<'a, S: serde::ser::SerializeMap> {
|
||||
serializer: S,
|
||||
skipped_field_indices: Option<&'a SkippedFieldIndices>,
|
||||
state: Option<Result<(), S::Error>>,
|
||||
}
|
||||
|
||||
impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> {
|
||||
#[inline]
|
||||
fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
|
||||
Self {
|
||||
serializer,
|
||||
skipped_field_indices,
|
||||
state: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn into_serializer(mut self) -> Result<S, S::Error> {
|
||||
match self.state {
|
||||
Some(Ok(())) => {}
|
||||
Some(Err(err)) => return Err(err),
|
||||
None => self.serializer.serialize_value("")?,
|
||||
}
|
||||
Ok(self.serializer)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn accept_field(&self, field: &tracing::field::Field) -> bool {
|
||||
self.state.is_none()
|
||||
&& field.name() == MESSAGE_FIELD
|
||||
&& !self
|
||||
.skipped_field_indices
|
||||
.is_some_and(|i| i.contains(field.index()))
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<'_, S> {
|
||||
#[inline]
|
||||
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&value));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&value));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&value));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&value));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&value));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&value));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&format_args!("{value:x?}")));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&value));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&format_args!("{value:?}")));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_error(
|
||||
&mut self,
|
||||
field: &tracing::field::Field,
|
||||
value: &(dyn std::error::Error + 'static),
|
||||
) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&format_args!("{value}")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks if there's any fields and field values present. If not, the JSON subobject
|
||||
/// can be skipped.
|
||||
// This is entirely optional and only cosmetic, though maybe helps a
|
||||
// bit during log parsing in dashboards when there's no field with empty object.
|
||||
struct FieldsPresent<'a>(pub bool, Option<&'a SkippedFieldIndices>);
|
||||
|
||||
// Even though some methods have an overhead (error, bytes) it is assumed the
|
||||
// compiler won't include this since we ignore the value entirely.
|
||||
impl tracing::field::Visit for FieldsPresent<'_> {
|
||||
#[inline]
|
||||
fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) {
|
||||
if !self.1.is_some_and(|i| i.contains(field.index()))
|
||||
&& field.name() != MESSAGE_FIELD
|
||||
&& !field.name().starts_with("log.")
|
||||
{
|
||||
self.0 |= true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializes the fields directly supplied with a log event.
|
||||
struct SerializableEventFields<'a, 'event>(
|
||||
&'a tracing::Event<'event>,
|
||||
Option<&'a SkippedFieldIndices>,
|
||||
);
|
||||
|
||||
impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
use serde::ser::SerializeMap;
|
||||
let serializer = serializer.serialize_map(None)?;
|
||||
let mut message_skipper = MessageFieldSkipper::new(serializer, self.1);
|
||||
self.0.record(&mut message_skipper);
|
||||
let serializer = message_skipper.into_serializer()?;
|
||||
serializer.end()
|
||||
}
|
||||
}
|
||||
|
||||
/// A tracing field visitor that skips the message field.
|
||||
struct MessageFieldSkipper<'a, S: serde::ser::SerializeMap> {
|
||||
serializer: S,
|
||||
skipped_field_indices: Option<&'a SkippedFieldIndices>,
|
||||
state: Result<(), S::Error>,
|
||||
}
|
||||
|
||||
impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
|
||||
#[inline]
|
||||
fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
|
||||
Self {
|
||||
serializer,
|
||||
skipped_field_indices,
|
||||
state: Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn accept_field(&self, field: &tracing::field::Field) -> bool {
|
||||
self.state.is_ok()
|
||||
&& field.name() != MESSAGE_FIELD
|
||||
&& !field.name().starts_with("log.")
|
||||
&& !self
|
||||
.skipped_field_indices
|
||||
.is_some_and(|i| i.contains(field.index()))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn into_serializer(self) -> Result<S, S::Error> {
|
||||
self.state?;
|
||||
Ok(self.serializer)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<'_, S> {
|
||||
#[inline]
|
||||
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_entry(field.name(), &value);
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_entry(field.name(), &value);
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_entry(field.name(), &value);
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_entry(field.name(), &value);
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_entry(field.name(), &value);
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_entry(field.name(), &value);
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self
|
||||
.serializer
|
||||
.serialize_entry(field.name(), &format_args!("{value:x?}"));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_entry(field.name(), &value);
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self
|
||||
.serializer
|
||||
.serialize_entry(field.name(), &format_args!("{value:?}"));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_error(
|
||||
&mut self,
|
||||
field: &tracing::field::Field,
|
||||
value: &(dyn std::error::Error + 'static),
|
||||
) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_value(&format_args!("{value}"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializes the span stack from root to leaf (parent of event) enumerated
|
||||
/// inside an object where the keys are just the number padded with zeroes
|
||||
/// to retain sorting order.
|
||||
// The object is necessary because Loki cannot flatten arrays.
|
||||
struct SerializableSpanStack<'a, 'b, Span>(&'b Context<'a, Span>)
|
||||
where
|
||||
Span: Subscriber + for<'lookup> LookupSpan<'lookup>;
|
||||
|
||||
impl<Span> serde::ser::Serialize for SerializableSpanStack<'_, '_, Span>
|
||||
where
|
||||
Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
|
||||
{
|
||||
fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
|
||||
where
|
||||
Ser: serde::ser::Serializer,
|
||||
{
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
|
||||
if let Some(leaf_span) = self.0.lookup_current() {
|
||||
for (i, span) in leaf_span.scope().from_root().enumerate() {
|
||||
serializer.serialize_entry(&format_args!("{i:02}"), &SerializableSpan(&span))?;
|
||||
}
|
||||
}
|
||||
|
||||
serializer.end()
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializes a single span. Include the span ID, name and its fields as
|
||||
/// recorded up to this point.
|
||||
struct SerializableSpan<'a, 'b, Span>(&'b SpanRef<'a, Span>)
|
||||
where
|
||||
Span: for<'lookup> LookupSpan<'lookup>;
|
||||
|
||||
impl<Span> serde::ser::Serialize for SerializableSpan<'_, '_, Span>
|
||||
where
|
||||
Span: for<'lookup> LookupSpan<'lookup>,
|
||||
{
|
||||
fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
|
||||
where
|
||||
Ser: serde::ser::Serializer,
|
||||
{
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
// TODO: the span ID is probably only useful for debugging tracing.
|
||||
serializer.serialize_entry("span_id", &format_args!("{:016x}", self.0.id().into_u64()))?;
|
||||
serializer.serialize_entry("span_name", self.0.metadata().name())?;
|
||||
|
||||
let ext = self.0.extensions();
|
||||
if let Some(data) = ext.get::<SpanFields>() {
|
||||
for (key, value) in &data.fields.pin() {
|
||||
serializer.serialize_entry(key, value)?;
|
||||
}
|
||||
}
|
||||
|
||||
serializer.end()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[allow(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
|
||||
use assert_json_diff::assert_json_eq;
|
||||
use tracing::info_span;
|
||||
|
||||
use super::*;
|
||||
|
||||
struct TestClock {
|
||||
current_time: Mutex<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
impl Clock for Arc<TestClock> {
|
||||
fn now(&self) -> DateTime<Utc> {
|
||||
*self.current_time.lock().expect("poisoned")
|
||||
}
|
||||
}
|
||||
|
||||
struct VecWriter<'a> {
|
||||
buffer: MutexGuard<'a, Vec<u8>>,
|
||||
}
|
||||
|
||||
impl MakeWriter for Arc<Mutex<Vec<u8>>> {
|
||||
fn make_writer(&self) -> impl io::Write {
|
||||
VecWriter {
|
||||
buffer: self.lock().expect("poisoned"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl io::Write for VecWriter<'_> {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.buffer.write(buf)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_field_collection() {
|
||||
let clock = Arc::new(TestClock {
|
||||
current_time: Mutex::new(Utc::now()),
|
||||
});
|
||||
let buffer = Arc::new(Mutex::new(Vec::new()));
|
||||
let log_layer = JsonLoggingLayer {
|
||||
clock: clock.clone(),
|
||||
skipped_field_indices: papaya::HashMap::default(),
|
||||
writer: buffer.clone(),
|
||||
};
|
||||
|
||||
let registry = tracing_subscriber::Registry::default().with(log_layer);
|
||||
|
||||
tracing::subscriber::with_default(registry, || {
|
||||
info_span!("span1", x = 40, x = 41, x = 42).in_scope(|| {
|
||||
info_span!("span2").in_scope(|| {
|
||||
tracing::error!(
|
||||
a = 1,
|
||||
a = 2,
|
||||
a = 3,
|
||||
message = "explicit message field",
|
||||
"implicit message field"
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
let buffer = Arc::try_unwrap(buffer)
|
||||
.expect("no other reference")
|
||||
.into_inner()
|
||||
.expect("poisoned");
|
||||
let actual: serde_json::Value = serde_json::from_slice(&buffer).expect("valid JSON");
|
||||
let expected: serde_json::Value = serde_json::json!(
|
||||
{
|
||||
"timestamp": clock.now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
|
||||
"level": "ERROR",
|
||||
"message": "explicit message field",
|
||||
"fields": {
|
||||
"a": 3,
|
||||
},
|
||||
"spans": {
|
||||
"00":{
|
||||
"span_id": "0000000000000001",
|
||||
"span_name": "span1",
|
||||
"x": 42,
|
||||
},
|
||||
"01": {
|
||||
"span_id": "0000000000000002",
|
||||
"span_name": "span2",
|
||||
}
|
||||
},
|
||||
"src": actual.as_object().unwrap().get("src").unwrap().as_str().unwrap(),
|
||||
"target": "proxy::logging::tests",
|
||||
"process_id": actual.as_object().unwrap().get("process_id").unwrap().as_number().unwrap(),
|
||||
"thread_id": actual.as_object().unwrap().get("thread_id").unwrap().as_number().unwrap(),
|
||||
"thread_name": "logging::tests::test_field_collection",
|
||||
}
|
||||
);
|
||||
|
||||
assert_json_eq!(actual, expected);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -120,6 +120,20 @@ pub enum InterpretedWalReaderError {
|
||||
WalStreamClosed,
|
||||
}
|
||||
|
||||
enum CurrentPositionUpdate {
|
||||
Reset(Lsn),
|
||||
NotReset(Lsn),
|
||||
}
|
||||
|
||||
impl CurrentPositionUpdate {
|
||||
fn current_position(&self) -> Lsn {
|
||||
match self {
|
||||
CurrentPositionUpdate::Reset(lsn) => *lsn,
|
||||
CurrentPositionUpdate::NotReset(lsn) => *lsn,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl InterpretedWalReaderState {
|
||||
fn current_position(&self) -> Option<Lsn> {
|
||||
match self {
|
||||
@@ -129,6 +143,26 @@ impl InterpretedWalReaderState {
|
||||
InterpretedWalReaderState::Done => None,
|
||||
}
|
||||
}
|
||||
|
||||
// Reset the current position of the WAL reader if the requested starting position
|
||||
// of the new shard is smaller than the current value.
|
||||
fn maybe_reset(&mut self, new_shard_start_pos: Lsn) -> CurrentPositionUpdate {
|
||||
match self {
|
||||
InterpretedWalReaderState::Running {
|
||||
current_position, ..
|
||||
} => {
|
||||
if new_shard_start_pos < *current_position {
|
||||
*current_position = new_shard_start_pos;
|
||||
CurrentPositionUpdate::Reset(*current_position)
|
||||
} else {
|
||||
CurrentPositionUpdate::NotReset(*current_position)
|
||||
}
|
||||
}
|
||||
InterpretedWalReaderState::Done => {
|
||||
panic!("maybe_reset called on finished reader")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct AttachShardNotification {
|
||||
@@ -410,15 +444,24 @@ impl InterpretedWalReader {
|
||||
};
|
||||
|
||||
senders.push(ShardSenderState { sender_id: new_sender_id, tx: sender, next_record_lsn: start_pos});
|
||||
let current_pos = self.state.read().unwrap().current_position().unwrap();
|
||||
if start_pos < current_pos {
|
||||
self.wal_stream.reset(start_pos).await;
|
||||
wal_decoder = WalStreamDecoder::new(start_pos, self.pg_version);
|
||||
}
|
||||
|
||||
// If the shard is subscribing below the current position the we need
|
||||
// to update the cursor that tracks where we are at in the WAL
|
||||
// ([`Self::state`]) and reset the WAL stream itself
|
||||
// (`[Self::wal_stream`]). This must be done atomically from the POV of
|
||||
// anything outside the select statement.
|
||||
let position_reset = self.state.write().unwrap().maybe_reset(start_pos);
|
||||
match position_reset {
|
||||
CurrentPositionUpdate::Reset(to) => {
|
||||
self.wal_stream.reset(to).await;
|
||||
wal_decoder = WalStreamDecoder::new(to, self.pg_version);
|
||||
},
|
||||
CurrentPositionUpdate::NotReset(_) => {}
|
||||
};
|
||||
|
||||
tracing::info!(
|
||||
"Added shard sender {} with start_pos={} current_pos={}",
|
||||
ShardSenderId::new(shard_id, new_sender_id), start_pos, current_pos
|
||||
ShardSenderId::new(shard_id, new_sender_id), start_pos, position_reset.current_position()
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -584,7 +627,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let resident_tli = tli.wal_residence_guard().await.unwrap();
|
||||
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT)
|
||||
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, None)
|
||||
.await
|
||||
.unwrap();
|
||||
let end_pos = end_watch.get();
|
||||
@@ -715,7 +758,6 @@ mod tests {
|
||||
const MSG_COUNT: usize = 200;
|
||||
const PG_VERSION: u32 = 17;
|
||||
const SHARD_COUNT: u8 = 2;
|
||||
const ATTACHED_SHARDS: u8 = 4;
|
||||
|
||||
let start_lsn = Lsn::from_str("0/149FD18").unwrap();
|
||||
let env = Env::new(true).unwrap();
|
||||
@@ -725,9 +767,11 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let resident_tli = tli.wal_residence_guard().await.unwrap();
|
||||
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut next_record_lsns = Vec::default();
|
||||
let end_watch =
|
||||
Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, Some(&mut next_record_lsns))
|
||||
.await
|
||||
.unwrap();
|
||||
let end_pos = end_watch.get();
|
||||
|
||||
let streaming_wal_reader = StreamingWalReader::new(
|
||||
@@ -746,38 +790,71 @@ mod tests {
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
|
||||
let mut batch_receivers = vec![rx];
|
||||
struct Sender {
|
||||
tx: Option<tokio::sync::mpsc::Sender<Batch>>,
|
||||
rx: tokio::sync::mpsc::Receiver<Batch>,
|
||||
shard: ShardIdentity,
|
||||
start_lsn: Lsn,
|
||||
received_next_record_lsns: Vec<Lsn>,
|
||||
}
|
||||
|
||||
impl Sender {
|
||||
fn new(start_lsn: Lsn, shard: ShardIdentity) -> Self {
|
||||
let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
|
||||
Self {
|
||||
tx: Some(tx),
|
||||
rx,
|
||||
shard,
|
||||
start_lsn,
|
||||
received_next_record_lsns: Vec::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert!(next_record_lsns.len() > 7);
|
||||
let start_lsns = vec![
|
||||
next_record_lsns[5],
|
||||
next_record_lsns[1],
|
||||
next_record_lsns[3],
|
||||
];
|
||||
let mut senders = start_lsns
|
||||
.into_iter()
|
||||
.map(|lsn| Sender::new(lsn, shard_0))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let first_sender = senders.first_mut().unwrap();
|
||||
let handle = InterpretedWalReader::spawn(
|
||||
streaming_wal_reader,
|
||||
start_lsn,
|
||||
tx,
|
||||
shard_0,
|
||||
first_sender.start_lsn,
|
||||
first_sender.tx.take().unwrap(),
|
||||
first_sender.shard,
|
||||
PG_VERSION,
|
||||
&Some("pageserver".to_string()),
|
||||
);
|
||||
|
||||
for _ in 0..(ATTACHED_SHARDS - 1) {
|
||||
let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
|
||||
handle.fanout(shard_0, tx, start_lsn).unwrap();
|
||||
batch_receivers.push(rx);
|
||||
for sender in senders.iter_mut().skip(1) {
|
||||
handle
|
||||
.fanout(sender.shard, sender.tx.take().unwrap(), sender.start_lsn)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
loop {
|
||||
let batch = batch_receivers.first_mut().unwrap().recv().await.unwrap();
|
||||
for rx in batch_receivers.iter_mut().skip(1) {
|
||||
let other_batch = rx.recv().await.unwrap();
|
||||
|
||||
assert_eq!(batch.wal_end_lsn, other_batch.wal_end_lsn);
|
||||
assert_eq!(
|
||||
batch.available_wal_end_lsn,
|
||||
other_batch.available_wal_end_lsn
|
||||
for sender in senders.iter_mut() {
|
||||
loop {
|
||||
let batch = sender.rx.recv().await.unwrap();
|
||||
tracing::info!(
|
||||
"Sender with start_lsn={} received batch ending at {} with {} records",
|
||||
sender.start_lsn,
|
||||
batch.wal_end_lsn,
|
||||
batch.records.records.len()
|
||||
);
|
||||
}
|
||||
|
||||
if batch.wal_end_lsn == batch.available_wal_end_lsn {
|
||||
break;
|
||||
for rec in batch.records.records {
|
||||
sender.received_next_record_lsns.push(rec.next_record_lsn);
|
||||
}
|
||||
|
||||
if batch.wal_end_lsn == batch.available_wal_end_lsn {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -792,5 +869,20 @@ mod tests {
|
||||
}
|
||||
|
||||
assert!(done);
|
||||
|
||||
for sender in senders {
|
||||
tracing::info!(
|
||||
"Validating records received by sender with start_lsn={}",
|
||||
sender.start_lsn
|
||||
);
|
||||
|
||||
assert!(sender.received_next_record_lsns.is_sorted());
|
||||
let expected = next_record_lsns
|
||||
.iter()
|
||||
.filter(|lsn| **lsn > sender.start_lsn)
|
||||
.copied()
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(sender.received_next_record_lsns, expected);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -122,6 +122,7 @@ impl Env {
|
||||
start_lsn: Lsn,
|
||||
msg_size: usize,
|
||||
msg_count: usize,
|
||||
mut next_record_lsns: Option<&mut Vec<Lsn>>,
|
||||
) -> anyhow::Result<EndWatch> {
|
||||
let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(receive_wal::MSG_QUEUE_SIZE);
|
||||
let (reply_tx, mut reply_rx) = tokio::sync::mpsc::channel(receive_wal::REPLY_QUEUE_SIZE);
|
||||
@@ -130,7 +131,7 @@ impl Env {
|
||||
|
||||
WalAcceptor::spawn(tli.wal_residence_guard().await?, msg_rx, reply_tx, Some(0));
|
||||
|
||||
let prefix = c"p";
|
||||
let prefix = c"neon-file:";
|
||||
let prefixlen = prefix.to_bytes_with_nul().len();
|
||||
assert!(msg_size >= prefixlen);
|
||||
let message = vec![0; msg_size - prefixlen];
|
||||
@@ -139,6 +140,9 @@ impl Env {
|
||||
&mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), start_lsn);
|
||||
for _ in 0..msg_count {
|
||||
let (lsn, record) = walgen.next().unwrap();
|
||||
if let Some(ref mut lsns) = next_record_lsns {
|
||||
lsns.push(lsn);
|
||||
}
|
||||
|
||||
let req = AppendRequest {
|
||||
h: AppendRequestHeader {
|
||||
|
||||
@@ -246,7 +246,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let resident_tli = tli.wal_residence_guard().await.unwrap();
|
||||
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT)
|
||||
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, None)
|
||||
.await
|
||||
.unwrap();
|
||||
let end_pos = end_watch.get();
|
||||
|
||||
@@ -225,7 +225,7 @@ pub(crate) enum NotifyError {
|
||||
// We shutdown while sending
|
||||
#[error("Shutting down")]
|
||||
ShuttingDown,
|
||||
// A response indicates we will never succeed, such as 400 or 404
|
||||
// A response indicates we will never succeed, such as 400 or 403
|
||||
#[error("Non-retryable error {0}")]
|
||||
Fatal(StatusCode),
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ use pageserver_api::shard::ShardConfigError;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use pageserver_api::shard::ShardStripeSize;
|
||||
use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
|
||||
use rustls::client::danger::ServerCertVerifier;
|
||||
use rustls::client::danger::{ServerCertVerified, ServerCertVerifier};
|
||||
use rustls::client::WebPkiServerVerifier;
|
||||
use rustls::crypto::ring;
|
||||
use scoped_futures::ScopedBoxFuture;
|
||||
@@ -194,6 +194,8 @@ impl Persistence {
|
||||
timeout: Duration,
|
||||
) -> Result<(), diesel::ConnectionError> {
|
||||
let started_at = Instant::now();
|
||||
log_postgres_connstr_info(database_url)
|
||||
.map_err(|e| diesel::ConnectionError::InvalidConnectionUrl(e.to_string()))?;
|
||||
loop {
|
||||
match establish_connection_rustls(database_url).await {
|
||||
Ok(_) => {
|
||||
@@ -1281,6 +1283,51 @@ pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
|
||||
Ok(Arc::new(store))
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// A verifier that accepts all certificates (but logs an error still)
|
||||
struct AcceptAll(Arc<WebPkiServerVerifier>);
|
||||
impl ServerCertVerifier for AcceptAll {
|
||||
fn verify_server_cert(
|
||||
&self,
|
||||
end_entity: &rustls::pki_types::CertificateDer<'_>,
|
||||
intermediates: &[rustls::pki_types::CertificateDer<'_>],
|
||||
server_name: &rustls::pki_types::ServerName<'_>,
|
||||
ocsp_response: &[u8],
|
||||
now: rustls::pki_types::UnixTime,
|
||||
) -> Result<ServerCertVerified, rustls::Error> {
|
||||
let r =
|
||||
self.0
|
||||
.verify_server_cert(end_entity, intermediates, server_name, ocsp_response, now);
|
||||
if let Err(err) = r {
|
||||
tracing::info!(
|
||||
?server_name,
|
||||
"ignoring db connection TLS validation error: {err:?}"
|
||||
);
|
||||
return Ok(ServerCertVerified::assertion());
|
||||
}
|
||||
r
|
||||
}
|
||||
fn verify_tls12_signature(
|
||||
&self,
|
||||
message: &[u8],
|
||||
cert: &rustls::pki_types::CertificateDer<'_>,
|
||||
dss: &rustls::DigitallySignedStruct,
|
||||
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
|
||||
self.0.verify_tls12_signature(message, cert, dss)
|
||||
}
|
||||
fn verify_tls13_signature(
|
||||
&self,
|
||||
message: &[u8],
|
||||
cert: &rustls::pki_types::CertificateDer<'_>,
|
||||
dss: &rustls::DigitallySignedStruct,
|
||||
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
|
||||
self.0.verify_tls13_signature(message, cert, dss)
|
||||
}
|
||||
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
|
||||
self.0.supported_verify_schemes()
|
||||
}
|
||||
}
|
||||
|
||||
/// Loads the root certificates and constructs a client config suitable for connecting.
|
||||
/// This function is blocking.
|
||||
fn client_config_with_root_certs() -> anyhow::Result<rustls::ClientConfig> {
|
||||
@@ -1290,76 +1337,12 @@ fn client_config_with_root_certs() -> anyhow::Result<rustls::ClientConfig> {
|
||||
.expect("ring should support the default protocol versions");
|
||||
static DO_CERT_CHECKS: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
|
||||
let do_cert_checks =
|
||||
DO_CERT_CHECKS.get_or_init(|| std::env::var("STORCON_CERT_CHECKS").is_ok());
|
||||
DO_CERT_CHECKS.get_or_init(|| std::env::var("STORCON_DB_CERT_CHECKS").is_ok());
|
||||
Ok(if *do_cert_checks {
|
||||
client_config
|
||||
.with_root_certificates(load_certs()?)
|
||||
.with_no_client_auth()
|
||||
} else {
|
||||
use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified};
|
||||
#[derive(Debug)]
|
||||
struct AcceptAll(Arc<WebPkiServerVerifier>);
|
||||
impl ServerCertVerifier for AcceptAll {
|
||||
fn verify_server_cert(
|
||||
&self,
|
||||
end_entity: &rustls::pki_types::CertificateDer<'_>,
|
||||
intermediates: &[rustls::pki_types::CertificateDer<'_>],
|
||||
server_name: &rustls::pki_types::ServerName<'_>,
|
||||
ocsp_response: &[u8],
|
||||
now: rustls::pki_types::UnixTime,
|
||||
) -> Result<ServerCertVerified, rustls::Error> {
|
||||
let r = self.0.verify_server_cert(
|
||||
end_entity,
|
||||
intermediates,
|
||||
server_name,
|
||||
ocsp_response,
|
||||
now,
|
||||
);
|
||||
if let Err(err) = r {
|
||||
tracing::info!(
|
||||
?server_name,
|
||||
"ignoring db connection TLS validation error: {err:?}"
|
||||
);
|
||||
return Ok(ServerCertVerified::assertion());
|
||||
}
|
||||
r
|
||||
}
|
||||
fn verify_tls12_signature(
|
||||
&self,
|
||||
message: &[u8],
|
||||
cert: &rustls::pki_types::CertificateDer<'_>,
|
||||
dss: &rustls::DigitallySignedStruct,
|
||||
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error>
|
||||
{
|
||||
let r = self.0.verify_tls12_signature(message, cert, dss);
|
||||
if let Err(err) = r {
|
||||
tracing::info!(
|
||||
"ignoring db connection 1.2 signature TLS validation error: {err:?}"
|
||||
);
|
||||
return Ok(HandshakeSignatureValid::assertion());
|
||||
}
|
||||
r
|
||||
}
|
||||
fn verify_tls13_signature(
|
||||
&self,
|
||||
message: &[u8],
|
||||
cert: &rustls::pki_types::CertificateDer<'_>,
|
||||
dss: &rustls::DigitallySignedStruct,
|
||||
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error>
|
||||
{
|
||||
let r = self.0.verify_tls13_signature(message, cert, dss);
|
||||
if let Err(err) = r {
|
||||
tracing::info!(
|
||||
"ignoring db connection 1.3 signature TLS validation error: {err:?}"
|
||||
);
|
||||
return Ok(HandshakeSignatureValid::assertion());
|
||||
}
|
||||
r
|
||||
}
|
||||
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
|
||||
self.0.supported_verify_schemes()
|
||||
}
|
||||
}
|
||||
let verifier = AcceptAll(
|
||||
WebPkiServerVerifier::builder_with_provider(
|
||||
load_certs()?,
|
||||
@@ -1389,6 +1372,29 @@ fn establish_connection_rustls(config: &str) -> BoxFuture<ConnectionResult<Async
|
||||
fut.boxed()
|
||||
}
|
||||
|
||||
#[cfg_attr(test, test)]
|
||||
fn test_config_debug_censors_password() {
|
||||
let has_pw =
|
||||
"host=/var/lib/postgresql,localhost port=1234 user=specialuser password='NOT ALLOWED TAG'";
|
||||
let has_pw_cfg = has_pw.parse::<tokio_postgres::Config>().unwrap();
|
||||
assert!(format!("{has_pw_cfg:?}").contains("specialuser"));
|
||||
// Ensure that the password is not leaked by the debug impl
|
||||
assert!(!format!("{has_pw_cfg:?}").contains("NOT ALLOWED TAG"));
|
||||
}
|
||||
|
||||
fn log_postgres_connstr_info(config_str: &str) -> anyhow::Result<()> {
|
||||
let config = config_str
|
||||
.parse::<tokio_postgres::Config>()
|
||||
.map_err(|_e| anyhow::anyhow!("Couldn't parse config str"))?;
|
||||
// We use debug formatting here, and use a unit test to ensure that we don't leak the password.
|
||||
// To make extra sure the test gets ran, run it every time the function is called
|
||||
// (this is rather cold code, we can afford it).
|
||||
#[cfg(not(test))]
|
||||
test_config_debug_censors_password();
|
||||
tracing::info!("database connection config: {config:?}");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
|
||||
#[derive(
|
||||
QueryableByName, Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq,
|
||||
|
||||
@@ -115,6 +115,15 @@ impl ReconcilerConfigBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn tenant_creation_hint(self, hint: bool) -> Self {
|
||||
Self {
|
||||
config: ReconcilerConfig {
|
||||
tenant_creation_hint: hint,
|
||||
..self.config
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn build(self) -> ReconcilerConfig {
|
||||
self.config
|
||||
}
|
||||
@@ -129,6 +138,10 @@ pub(crate) struct ReconcilerConfig {
|
||||
// During live migrations this is the amount of time that
|
||||
// the pagserver will hold our poll.
|
||||
secondary_download_request_timeout: Option<Duration>,
|
||||
|
||||
// A hint indicating whether this reconciliation is done on the
|
||||
// creation of a new tenant. This only informs logging behaviour.
|
||||
tenant_creation_hint: bool,
|
||||
}
|
||||
|
||||
impl ReconcilerConfig {
|
||||
@@ -143,6 +156,10 @@ impl ReconcilerConfig {
|
||||
self.secondary_download_request_timeout
|
||||
.unwrap_or(SECONDARY_DOWNLOAD_REQUEST_TIMEOUT_DEFAULT)
|
||||
}
|
||||
|
||||
pub(crate) fn tenant_creation_hint(&self) -> bool {
|
||||
self.tenant_creation_hint
|
||||
}
|
||||
}
|
||||
|
||||
/// RAII resource units granted to a Reconciler, which it should keep alive until it finishes doing I/O
|
||||
@@ -934,16 +951,35 @@ impl Reconciler {
|
||||
)
|
||||
.await;
|
||||
if let Err(e) = &result {
|
||||
// It is up to the caller whether they want to drop out on this error, but they don't have to:
|
||||
// in general we should avoid letting unavailability of the cloud control plane stop us from
|
||||
// making progress.
|
||||
if !matches!(e, NotifyError::ShuttingDown) {
|
||||
tracing::warn!("Failed to notify compute of attached pageserver {node}: {e}");
|
||||
}
|
||||
|
||||
// Set this flag so that in our ReconcileResult we will set the flag on the shard that it
|
||||
// needs to retry at some point.
|
||||
self.compute_notify_failure = true;
|
||||
|
||||
// It is up to the caller whether they want to drop out on this error, but they don't have to:
|
||||
// in general we should avoid letting unavailability of the cloud control plane stop us from
|
||||
// making progress.
|
||||
match e {
|
||||
// 404s from cplane during tenant creation are expected.
|
||||
// Cplane only persists the shards to the database after
|
||||
// creating the tenant and the timeline. If we notify before
|
||||
// that, we'll get a 404.
|
||||
//
|
||||
// This is fine because tenant creations happen via /location_config
|
||||
// and that returns the list of locations in the response. Hence, we
|
||||
// silence the error and return Ok(()) here. Reconciliation will still
|
||||
// be retried because we set [`Reconciler::compute_notify_failure`] above.
|
||||
NotifyError::Unexpected(hyper::StatusCode::NOT_FOUND)
|
||||
if self.reconciler_config.tenant_creation_hint() =>
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
NotifyError::ShuttingDown => {}
|
||||
_ => {
|
||||
tracing::warn!(
|
||||
"Failed to notify compute of attached pageserver {node}: {e}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
result
|
||||
} else {
|
||||
|
||||
@@ -2238,9 +2238,14 @@ impl Service {
|
||||
let waiters = {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, _scheduler) = locked.parts_mut();
|
||||
let config = ReconcilerConfigBuilder::new()
|
||||
.tenant_creation_hint(true)
|
||||
.build();
|
||||
tenants
|
||||
.range_mut(TenantShardId::tenant_range(tenant_id))
|
||||
.filter_map(|(_shard_id, shard)| self.maybe_reconcile_shard(shard, nodes))
|
||||
.filter_map(|(_shard_id, shard)| {
|
||||
self.maybe_configured_reconcile_shard(shard, nodes, config)
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
|
||||
@@ -707,6 +707,7 @@ impl TenantShard {
|
||||
if let Some(node_id) = self.intent.get_attached() {
|
||||
// Populate secondary by demoting the attached node
|
||||
self.intent.demote_attached(scheduler, *node_id);
|
||||
|
||||
modified = true;
|
||||
} else if self.intent.secondary.is_empty() {
|
||||
// Populate secondary by scheduling a fresh node
|
||||
@@ -979,24 +980,51 @@ impl TenantShard {
|
||||
),
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
if secondary_scores.iter().any(|score| score.1.is_none()) {
|
||||
// Don't have full list of scores, so can't make a good decision about which to drop unless
|
||||
// there is an obvious one in the wrong AZ
|
||||
for secondary in self.intent.get_secondary() {
|
||||
if scheduler.get_node_az(secondary) == self.intent.preferred_az_id {
|
||||
// Trivial case: if we only have one secondary, drop that one
|
||||
if self.intent.get_secondary().len() == 1 {
|
||||
return Some(ScheduleOptimization {
|
||||
sequence: self.sequence,
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(
|
||||
*self.intent.get_secondary().first().unwrap(),
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
// Try to find a "good" secondary to keep, without relying on scores (one or more nodes is in a state
|
||||
// where its score can't be calculated), and drop the others. This enables us to make progress in
|
||||
// most cases, even if some nodes are offline or have scheduling=pause set.
|
||||
|
||||
debug_assert!(self.intent.attached.is_some()); // We should not make it here unless attached -- this
|
||||
// logic presumes we are in a mode where we want secondaries to be in non-home AZ
|
||||
if let Some(retain_secondary) = self.intent.get_secondary().iter().find(|n| {
|
||||
let in_home_az = scheduler.get_node_az(n) == self.intent.preferred_az_id;
|
||||
let is_available = secondary_scores
|
||||
.get(n)
|
||||
.expect("Built from same list of nodes")
|
||||
.is_some();
|
||||
is_available && !in_home_az
|
||||
}) {
|
||||
// Great, we found one to retain. Pick some other to drop.
|
||||
if let Some(victim) = self
|
||||
.intent
|
||||
.get_secondary()
|
||||
.iter()
|
||||
.find(|n| n != &retain_secondary)
|
||||
{
|
||||
return Some(ScheduleOptimization {
|
||||
sequence: self.sequence,
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(*secondary),
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(*victim),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Fall through: we didn't identify one to remove. This ought to be rare.
|
||||
tracing::warn!("Keeping extra secondaries: can't determine which of {:?} to remove (some nodes offline?)",
|
||||
self.intent.get_secondary()
|
||||
);
|
||||
self.intent.get_secondary()
|
||||
);
|
||||
} else {
|
||||
let victim = secondary_scores
|
||||
.iter()
|
||||
@@ -1005,7 +1033,7 @@ impl TenantShard {
|
||||
.0;
|
||||
return Some(ScheduleOptimization {
|
||||
sequence: self.sequence,
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(victim),
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(*victim),
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -2379,6 +2407,110 @@ pub(crate) mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Test how the optimisation code behaves with an extra secondary
|
||||
#[test]
|
||||
fn optimize_removes_secondary() -> anyhow::Result<()> {
|
||||
let az_a_tag = AvailabilityZone("az-a".to_string());
|
||||
let az_b_tag = AvailabilityZone("az-b".to_string());
|
||||
let mut nodes = make_test_nodes(
|
||||
4,
|
||||
&[
|
||||
az_a_tag.clone(),
|
||||
az_b_tag.clone(),
|
||||
az_a_tag.clone(),
|
||||
az_b_tag.clone(),
|
||||
],
|
||||
);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
|
||||
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
shard_a.intent.preferred_az_id = Some(az_a_tag.clone());
|
||||
shard_a
|
||||
.schedule(&mut scheduler, &mut schedule_context)
|
||||
.unwrap();
|
||||
|
||||
// Attached on node 1, secondary on node 2
|
||||
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
|
||||
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(2)]);
|
||||
|
||||
// Initially optimiser is idle
|
||||
assert_eq!(
|
||||
shard_a.optimize_attachment(&mut scheduler, &schedule_context),
|
||||
None
|
||||
);
|
||||
assert_eq!(
|
||||
shard_a.optimize_secondary(&mut scheduler, &schedule_context),
|
||||
None
|
||||
);
|
||||
|
||||
// A spare secondary in the home AZ: it should be removed -- this is the situation when we're midway through a graceful migration, after cutting over
|
||||
// to our new location
|
||||
shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
|
||||
let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
||||
assert_eq!(
|
||||
optimization,
|
||||
Some(ScheduleOptimization {
|
||||
sequence: shard_a.sequence,
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
|
||||
})
|
||||
);
|
||||
shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
|
||||
|
||||
// A spare secondary in the non-home AZ, and one of them is offline
|
||||
shard_a.intent.push_secondary(&mut scheduler, NodeId(4));
|
||||
nodes
|
||||
.get_mut(&NodeId(4))
|
||||
.unwrap()
|
||||
.set_availability(NodeAvailability::Offline);
|
||||
scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
|
||||
let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
||||
assert_eq!(
|
||||
optimization,
|
||||
Some(ScheduleOptimization {
|
||||
sequence: shard_a.sequence,
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(4))
|
||||
})
|
||||
);
|
||||
shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
|
||||
|
||||
// A spare secondary when should have none
|
||||
shard_a.policy = PlacementPolicy::Attached(0);
|
||||
let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
||||
assert_eq!(
|
||||
optimization,
|
||||
Some(ScheduleOptimization {
|
||||
sequence: shard_a.sequence,
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
|
||||
})
|
||||
);
|
||||
shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
|
||||
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
|
||||
assert_eq!(shard_a.intent.get_secondary(), &vec![]);
|
||||
|
||||
// Check that in secondary mode, we preserve the secondary in the preferred AZ
|
||||
let mut schedule_context = ScheduleContext::default(); // Fresh context, we're about to call schedule()
|
||||
shard_a.policy = PlacementPolicy::Secondary;
|
||||
shard_a
|
||||
.schedule(&mut scheduler, &mut schedule_context)
|
||||
.unwrap();
|
||||
assert_eq!(shard_a.intent.get_attached(), &None);
|
||||
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
|
||||
assert_eq!(
|
||||
shard_a.optimize_attachment(&mut scheduler, &schedule_context),
|
||||
None
|
||||
);
|
||||
assert_eq!(
|
||||
shard_a.optimize_secondary(&mut scheduler, &schedule_context),
|
||||
None
|
||||
);
|
||||
|
||||
shard_a.intent.clear(&mut scheduler);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Optimize til quiescent: this emulates what Service::optimize_all does, when
|
||||
// called repeatedly in the background.
|
||||
// Returns the applied optimizations
|
||||
|
||||
@@ -2766,6 +2766,11 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
log.error(f"Failed to decode LocationConf, raw content ({len(bytes)} bytes): {bytes}")
|
||||
raise
|
||||
|
||||
def heatmap_content(self, tenant_shard_id: TenantId | TenantShardId) -> Any:
|
||||
path = self.tenant_dir(tenant_shard_id) / "heatmap-v1.json"
|
||||
with open(path) as f:
|
||||
return json.load(f)
|
||||
|
||||
def tenant_create(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
|
||||
@@ -34,16 +34,20 @@ def test_layer_map(neon_env_builder: NeonEnvBuilder, zenbenchmark):
|
||||
cur.execute("set log_statement = 'all'")
|
||||
cur.execute("create table t(x integer)")
|
||||
for _ in range(n_iters):
|
||||
cur.execute(f"insert into t values (generate_series(1,{n_records}))")
|
||||
with zenbenchmark.record_duration(f"insert into t values (generate_series(1,{n_records}))"):
|
||||
cur.execute(f"insert into t values (generate_series(1,{n_records}))")
|
||||
time.sleep(1)
|
||||
|
||||
cur.execute("vacuum t")
|
||||
with zenbenchmark.record_duration("vacuum t"):
|
||||
cur.execute("vacuum t")
|
||||
|
||||
with zenbenchmark.record_duration("test_query"):
|
||||
with zenbenchmark.record_duration("SELECT count(*) from t"):
|
||||
cur.execute("SELECT count(*) from t")
|
||||
assert cur.fetchone() == (n_iters * n_records,)
|
||||
|
||||
flush_ep_to_pageserver(env, endpoint, tenant, timeline)
|
||||
env.pageserver.http_client().timeline_checkpoint(
|
||||
tenant, timeline, compact=False, wait_until_uploaded=True
|
||||
)
|
||||
with zenbenchmark.record_duration("flush_ep_to_pageserver"):
|
||||
flush_ep_to_pageserver(env, endpoint, tenant, timeline)
|
||||
with zenbenchmark.record_duration("timeline_checkpoint"):
|
||||
env.pageserver.http_client().timeline_checkpoint(
|
||||
tenant, timeline, compact=False, wait_until_uploaded=True
|
||||
)
|
||||
|
||||
@@ -136,7 +136,7 @@ def run_command_and_log_output(command, log_file_path: Path):
|
||||
"LD_LIBRARY_PATH": f"{os.getenv('PGCOPYDB_LIB_PATH')}:{os.getenv('PG_16_LIB_PATH')}",
|
||||
"PGCOPYDB_SOURCE_PGURI": cast(str, os.getenv("BENCHMARK_INGEST_SOURCE_CONNSTR")),
|
||||
"PGCOPYDB_TARGET_PGURI": cast(str, os.getenv("BENCHMARK_INGEST_TARGET_CONNSTR")),
|
||||
"PGOPTIONS": "-c maintenance_work_mem=8388608 -c max_parallel_maintenance_workers=7",
|
||||
"PGOPTIONS": "-c idle_in_transaction_session_timeout=0 -c maintenance_work_mem=8388608 -c max_parallel_maintenance_workers=7",
|
||||
}
|
||||
# Combine the current environment with custom variables
|
||||
env = os.environ.copy()
|
||||
|
||||
@@ -29,6 +29,21 @@ AGGRESSIVE_COMPACTION_TENANT_CONF = {
|
||||
# "lsn_lease_length": "0s", -- TODO: would cause branch creation errors, should fix later
|
||||
}
|
||||
|
||||
PREEMPT_COMPACTION_TENANT_CONF = {
|
||||
"gc_period": "5s",
|
||||
"compaction_period": "5s",
|
||||
# Small checkpoint distance to create many layers
|
||||
"checkpoint_distance": 1024**2,
|
||||
# Compact small layers
|
||||
"compaction_target_size": 1024**2,
|
||||
"image_creation_threshold": 1,
|
||||
"image_creation_preempt_threshold": 1,
|
||||
# compact more frequently
|
||||
"compaction_threshold": 3,
|
||||
"compaction_upper_limit": 6,
|
||||
"lsn_lease_length": "0s",
|
||||
}
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
@pytest.mark.parametrize(
|
||||
@@ -36,7 +51,8 @@ AGGRESSIVE_COMPACTION_TENANT_CONF = {
|
||||
[PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED],
|
||||
)
|
||||
def test_pageserver_compaction_smoke(
|
||||
neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: PageserverWalReceiverProtocol
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
wal_receiver_protocol: PageserverWalReceiverProtocol,
|
||||
):
|
||||
"""
|
||||
This is a smoke test that compaction kicks in. The workload repeatedly churns
|
||||
@@ -54,7 +70,8 @@ def test_pageserver_compaction_smoke(
|
||||
page_cache_size=10
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=AGGRESSIVE_COMPACTION_TENANT_CONF)
|
||||
conf = AGGRESSIVE_COMPACTION_TENANT_CONF.copy()
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=conf)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
@@ -113,6 +130,41 @@ page_cache_size=10
|
||||
assert vectored_average < 8
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
def test_pageserver_compaction_preempt(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
# Ideally we should be able to do unit tests for this, but we need real Postgres
|
||||
# WALs in order to do unit testing...
|
||||
|
||||
conf = PREEMPT_COMPACTION_TENANT_CONF.copy()
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=conf)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
row_count = 200000
|
||||
churn_rounds = 10
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init(env.pageserver.id)
|
||||
|
||||
log.info("Writing initial data ...")
|
||||
workload.write_rows(row_count, env.pageserver.id)
|
||||
|
||||
for i in range(1, churn_rounds + 1):
|
||||
log.info(f"Running churn round {i}/{churn_rounds} ...")
|
||||
workload.churn_rows(row_count, env.pageserver.id, upload=False)
|
||||
workload.validate(env.pageserver.id)
|
||||
ps_http.timeline_compact(tenant_id, timeline_id, wait_until_uploaded=True)
|
||||
log.info("Validating at workload end ...")
|
||||
workload.validate(env.pageserver.id)
|
||||
# ensure image layer creation gets preempted and then resumed
|
||||
env.pageserver.assert_log_contains("resuming image layer creation")
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
@pytest.mark.parametrize(
|
||||
"with_branches",
|
||||
@@ -250,6 +302,9 @@ def test_pageserver_gc_compaction_idempotent(
|
||||
workload.churn_rows(row_count, env.pageserver.id)
|
||||
# compact 3 times if mode is before_restart
|
||||
n_compactions = 3 if compaction_mode == "before_restart" else 1
|
||||
ps_http.timeline_compact(
|
||||
tenant_id, timeline_id, force_l0_compaction=True, wait_until_uploaded=True
|
||||
)
|
||||
for _ in range(n_compactions):
|
||||
# Force refresh gc info to have gc_cutoff generated
|
||||
ps_http.timeline_gc(tenant_id, timeline_id, None)
|
||||
|
||||
@@ -95,6 +95,8 @@ def test_remote_extensions(
|
||||
|
||||
# mock remote_extensions spec
|
||||
spec: dict[str, Any] = {
|
||||
"public_extensions": ["anon"],
|
||||
"custom_extensions": None,
|
||||
"library_index": {
|
||||
"anon": "anon",
|
||||
},
|
||||
|
||||
@@ -443,7 +443,7 @@ def test_heatmap_uploads(neon_env_builder: NeonEnvBuilder):
|
||||
workload.write_rows(256, env.pageservers[0].id)
|
||||
env.pageserver.http_client().tenant_heatmap_upload(tenant_id)
|
||||
|
||||
def validate_heatmap(heatmap):
|
||||
def validate_heatmap(heatmap, on_disk_heatmap):
|
||||
assert len(heatmap["timelines"]) == 1
|
||||
assert heatmap["timelines"][0]["timeline_id"] == str(timeline_id)
|
||||
assert len(heatmap["timelines"][0]["layers"]) > 0
|
||||
@@ -452,10 +452,13 @@ def test_heatmap_uploads(neon_env_builder: NeonEnvBuilder):
|
||||
# Each layer appears at most once
|
||||
assert len(set(layer["name"] for layer in layers)) == len(layers)
|
||||
|
||||
assert heatmap == on_disk_heatmap
|
||||
|
||||
# Download and inspect the heatmap that the pageserver uploaded
|
||||
heatmap_first = env.pageserver_remote_storage.heatmap_content(tenant_id)
|
||||
heatmap_first_on_disk = env.pageserver.heatmap_content(tenant_id)
|
||||
log.info(f"Read back heatmap: {heatmap_first}")
|
||||
validate_heatmap(heatmap_first)
|
||||
validate_heatmap(heatmap_first, heatmap_first_on_disk)
|
||||
|
||||
# Do some more I/O to generate more layers
|
||||
workload.churn_rows(64, env.pageservers[0].id)
|
||||
@@ -463,9 +466,10 @@ def test_heatmap_uploads(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Ensure that another heatmap upload includes the new layers
|
||||
heatmap_second = env.pageserver_remote_storage.heatmap_content(tenant_id)
|
||||
heatmap_second_on_disk = env.pageserver.heatmap_content(tenant_id)
|
||||
log.info(f"Read back heatmap: {heatmap_second}")
|
||||
assert heatmap_second != heatmap_first
|
||||
validate_heatmap(heatmap_second)
|
||||
validate_heatmap(heatmap_second, heatmap_second_on_disk)
|
||||
|
||||
|
||||
def list_elegible_layers(
|
||||
|
||||
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 3cf7ce1afa...86d9ea96eb
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: f0ffc8279d...8dfd5a7030
4
vendor/revisions.json
vendored
4
vendor/revisions.json
vendored
@@ -1,11 +1,11 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.2",
|
||||
"f0ffc8279dbcbbc439981a4fd001a9687e5d665d"
|
||||
"8dfd5a7030d3e8a98b60265ebe045788892ac7f3"
|
||||
],
|
||||
"v16": [
|
||||
"16.6",
|
||||
"3cf7ce1afab75027716d14223f95ddb300754162"
|
||||
"86d9ea96ebb9088eac62f57f1f5ace68e70e0d1c"
|
||||
],
|
||||
"v15": [
|
||||
"15.10",
|
||||
|
||||
@@ -92,6 +92,7 @@ tonic = { version = "0.12", default-features = false, features = ["codegen", "pr
|
||||
tower = { version = "0.4", default-features = false, features = ["balance", "buffer", "limit", "util"] }
|
||||
tracing = { version = "0.1", features = ["log"] }
|
||||
tracing-core = { version = "0.1" }
|
||||
tracing-log = { version = "0.2" }
|
||||
url = { version = "2", features = ["serde"] }
|
||||
zerocopy = { version = "0.7", features = ["derive", "simd"] }
|
||||
zeroize = { version = "1", features = ["derive", "serde"] }
|
||||
|
||||
Reference in New Issue
Block a user