Compare commits

..

1 Commits

Author SHA1 Message Date
John Spray
d1deef0d6b tests: make reconcile_until_idle more aggressive by default 2025-02-04 14:47:10 +01:00
32 changed files with 235 additions and 1734 deletions

55
Cargo.lock generated
View File

@@ -206,16 +206,6 @@ dependencies = [
"syn 2.0.90",
]
[[package]]
name = "assert-json-diff"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12"
dependencies = [
"serde",
"serde_json",
]
[[package]]
name = "async-channel"
version = "1.9.0"
@@ -1020,12 +1010,6 @@ dependencies = [
"generic-array",
]
[[package]]
name = "boxcar"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2721c3c5a6f0e7f7e607125d963fedeb765f545f67adc9d71ed934693881eb42"
[[package]]
name = "bstr"
version = "1.5.0"
@@ -2449,16 +2433,6 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "gettid"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "397256552fed4a9e577850498071831ec8f18ea83368aecc114cab469dcb43e5"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "gimli"
version = "0.31.1"
@@ -4238,16 +4212,6 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "papaya"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc7c76487f7eaa00a0fc1d7f88dc6b295aec478d11b0fc79f857b62c2874124c"
dependencies = [
"equivalent",
"seize",
]
[[package]]
name = "parking"
version = "2.1.1"
@@ -4875,7 +4839,6 @@ dependencies = [
"ahash",
"anyhow",
"arc-swap",
"assert-json-diff",
"async-compression",
"async-trait",
"atomic-take",
@@ -4883,7 +4846,6 @@ dependencies = [
"aws-sdk-iam",
"aws-sigv4",
"base64 0.13.1",
"boxcar",
"bstr",
"bytes",
"camino",
@@ -4900,7 +4862,6 @@ dependencies = [
"flate2",
"framed-websockets",
"futures",
"gettid",
"hashbrown 0.14.5",
"hashlink",
"hex",
@@ -4923,9 +4884,7 @@ dependencies = [
"measured",
"metrics",
"once_cell",
"opentelemetry",
"p256 0.13.2",
"papaya",
"parking_lot 0.12.1",
"parquet",
"parquet_derive",
@@ -4972,9 +4931,6 @@ dependencies = [
"tokio-tungstenite 0.21.0",
"tokio-util",
"tracing",
"tracing-log",
"tracing-opentelemetry",
"tracing-serde",
"tracing-subscriber",
"tracing-utils",
"try-lock",
@@ -5928,16 +5884,6 @@ dependencies = [
"libc",
]
[[package]]
name = "seize"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d84b0c858bdd30cb56f5597f8b3bf702ec23829e652cc636a1e5a7b9de46ae93"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "semver"
version = "1.0.17"
@@ -8199,7 +8145,6 @@ dependencies = [
"tower 0.4.13",
"tracing",
"tracing-core",
"tracing-log",
"url",
"zerocopy",
"zeroize",

View File

@@ -54,7 +54,6 @@ async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
atomic-take = "1.1.0"
backtrace = "0.3.74"
flate2 = "1.0.26"
assert-json-diff = "2"
async-stream = "0.3"
async-trait = "0.1"
aws-config = { version = "1.5", default-features = false, features=["rustls", "sso"] }
@@ -194,9 +193,7 @@ tower-http = { version = "0.6.2", features = ["request-id", "trace"] }
tower-service = "0.3.3"
tracing = "0.1"
tracing-error = "0.2"
tracing-log = "0.2"
tracing-opentelemetry = "0.28"
tracing-serde = "0.2.0"
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
try-lock = "0.2.5"
twox-hash = { version = "1.6.3", default-features = false }

View File

@@ -3,13 +3,8 @@ ARG DEBIAN_VERSION=bookworm
FROM debian:bookworm-slim AS pgcopydb_builder
ARG DEBIAN_VERSION
# Use strict mode for bash to catch errors early
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
# By default, /bin/sh used in debian images will treat '\n' as eol,
# but as we use bash as SHELL, and built-in echo in bash requires '-e' flag for that.
RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc && \
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc \
echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc
RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
@@ -60,8 +55,7 @@ ARG DEBIAN_VERSION
# Add nonroot user
RUN useradd -ms /bin/bash nonroot -b /home
# Use strict mode for bash to catch errors early
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
SHELL ["/bin/bash", "-c"]
RUN mkdir -p /pgcopydb/bin && \
mkdir -p /pgcopydb/lib && \
@@ -72,7 +66,7 @@ COPY --from=pgcopydb_builder /usr/lib/postgresql/16/bin/pgcopydb /pgcopydb/bin/p
COPY --from=pgcopydb_builder /pgcopydb/lib/libpq.so.5 /pgcopydb/lib/libpq.so.5
RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc && \
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc \
echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc
# System deps
@@ -196,14 +190,8 @@ RUN set -e \
# It includes several bug fixes on top on v2.0 release (https://github.com/linux-test-project/lcov/compare/v2.0...master)
# And patches from us:
# - Generates json file with code coverage summary (https://github.com/neondatabase/lcov/commit/426e7e7a22f669da54278e9b55e6d8caabd00af0.tar.gz)
RUN set +o pipefail && \
for package in Capture::Tiny DateTime Devel::Cover Digest::MD5 File::Spec JSON::XS Memory::Process Time::HiRes JSON; do \
yes | perl -MCPAN -e "CPAN::Shell->notest('install', '$package')";\
done && \
set -o pipefail
# Split into separate step to debug flaky failures here
RUN wget https://github.com/neondatabase/lcov/archive/426e7e7a22f669da54278e9b55e6d8caabd00af0.tar.gz -O lcov.tar.gz \
&& ls -laht lcov.tar.gz && sha256sum lcov.tar.gz \
RUN for package in Capture::Tiny DateTime Devel::Cover Digest::MD5 File::Spec JSON::XS Memory::Process Time::HiRes JSON; do yes | perl -MCPAN -e "CPAN::Shell->notest('install', '$package')"; done \
&& wget https://github.com/neondatabase/lcov/archive/426e7e7a22f669da54278e9b55e6d8caabd00af0.tar.gz -O lcov.tar.gz \
&& echo "61a22a62e20908b8b9e27d890bd0ea31f567a7b9668065589266371dcbca0992 lcov.tar.gz" | sha256sum --check \
&& mkdir -p lcov && tar -xzf lcov.tar.gz -C lcov --strip-components=1 \
&& cd lcov \

View File

@@ -85,10 +85,6 @@ ARG DEBIAN_VERSION=bookworm
ARG DEBIAN_FLAVOR=${DEBIAN_VERSION}-slim
ARG ALPINE_CURL_VERSION=8.11.1
# By default, build all PostgreSQL extensions. For quick local testing when you don't
# care about the extensions, pass EXTENSIONS=none or EXTENSIONS=minimal
ARG EXTENSIONS=all
#########################################################################################
#
# Layer "build-deps"
@@ -100,10 +96,8 @@ ARG DEBIAN_VERSION
# Use strict mode for bash to catch errors early
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
# By default, /bin/sh used in debian images will treat '\n' as eol,
# but as we use bash as SHELL, and built-in echo in bash requires '-e' flag for that.
RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc && \
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc \
echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc
RUN case $DEBIAN_VERSION in \
@@ -1074,7 +1068,6 @@ ENV PATH="/home/nonroot/.cargo/bin:$PATH"
USER nonroot
WORKDIR /home/nonroot
# See comment on the top of the file regading `echo` and `\n`
RUN echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /home/nonroot/.curlrc
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && \
@@ -1488,35 +1481,12 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \
#########################################################################################
#
# Layer "extensions-none"
#
#########################################################################################
FROM build-deps AS extensions-none
RUN mkdir /usr/local/pgsql
#########################################################################################
#
# Layer "extensions-minimal"
#
# This subset of extensions includes the extensions that we have in
# shared_preload_libraries by default.
#
#########################################################################################
FROM build-deps AS extensions-minimal
COPY --from=pgrag-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=timescaledb-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_cron-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
#########################################################################################
#
# Layer "extensions-all"
# Layer "all-extensions"
# Bundle together all the extensions
#
#########################################################################################
FROM build-deps AS extensions-all
FROM build-deps AS all-extensions
ARG PG_VERSION
# Public extensions
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
@@ -1558,13 +1528,7 @@ COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_mooncake-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_repack-build /usr/local/pgsql/ /usr/local/pgsql/
#########################################################################################
#
# Layer "neon-pg-ext-build"
# Includes Postgres and all the extensions chosen by EXTENSIONS arg.
#
#########################################################################################
FROM extensions-${EXTENSIONS} AS neon-pg-ext-build
COPY --from=neon-ext-build /usr/local/pgsql/ /usr/local/pgsql/
#########################################################################################
#
@@ -1620,7 +1584,6 @@ FROM alpine/curl:${ALPINE_CURL_VERSION} AS exporters
ARG TARGETARCH
# Keep sql_exporter version same as in build-tools.Dockerfile and
# test_runner/regress/test_compute_metrics.py
# See comment on the top of the file regading `echo`, `-e` and `\n`
RUN echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc; \
if [ "$TARGETARCH" = "amd64" ]; then\
postgres_exporter_sha256='027e75dda7af621237ff8f5ac66b78a40b0093595f06768612b92b1374bd3105';\
@@ -1647,8 +1610,7 @@ RUN echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 30
#
#########################################################################################
FROM neon-ext-build AS postgres-cleanup-layer
COPY --from=neon-pg-ext-build /usr/local/pgsql /usr/local/pgsql
COPY --from=all-extensions /usr/local/pgsql /usr/local/pgsql
# Remove binaries from /bin/ that we won't use (or would manually copy & install otherwise)
RUN cd /usr/local/pgsql/bin && rm -f ecpg raster2pgsql shp2pgsql pgtopo_export pgtopo_import pgsql2shp
@@ -1738,10 +1700,6 @@ ENV PGDATABASE=postgres
#########################################################################################
FROM debian:$DEBIAN_FLAVOR
ARG DEBIAN_VERSION
# Use strict mode for bash to catch errors early
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
# Add user postgres
RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
echo "postgres:test_console_pass" | chpasswd && \

View File

@@ -32,7 +32,6 @@ reason = "the marvin attack only affects private key decryption, not public key
# https://embarkstudios.github.io/cargo-deny/checks/licenses/cfg.html
[licenses]
allow = [
"0BSD",
"Apache-2.0",
"BSD-2-Clause",
"BSD-3-Clause",

View File

@@ -52,7 +52,6 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
if [ $pg_version -ge 16 ]; then
docker cp ext-src $TEST_CONTAINER_NAME:/
docker exec $TEST_CONTAINER_NAME bash -c "apt update && apt install -y libtap-parser-sourcehandler-pgtap-perl"
# This is required for the pg_hint_plan test, to prevent flaky log message causing the test to fail
# It cannot be moved to Dockerfile now because the database directory is created after the start of the container
echo Adding dummy config

View File

@@ -1,4 +0,0 @@
#!/bin/bash
set -ex
cd "$(dirname "${0}")"
pg_prove test.sql

View File

@@ -1,15 +0,0 @@
diff --git a/test.sql b/test.sql
index d7a0ca8..f15bc76 100644
--- a/test.sql
+++ b/test.sql
@@ -9,9 +9,7 @@
\set ON_ERROR_STOP true
\set QUIET 1
-CREATE EXTENSION pgcrypto;
-CREATE EXTENSION pgtap;
-CREATE EXTENSION pgjwt;
+CREATE EXTENSION IF NOT EXISTS pgtap;
BEGIN;
SELECT plan(23);

View File

@@ -1,5 +0,0 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
patch -p1 <test-upgrade.patch
pg_prove test.sql

View File

@@ -24,7 +24,7 @@ function wait_for_ready {
}
function create_extensions() {
for ext in ${1}; do
docker compose exec neon-test-extensions psql -X -v ON_ERROR_STOP=1 -d contrib_regression -c "CREATE EXTENSION IF NOT EXISTS ${ext} CASCADE"
docker compose exec neon-test-extensions psql -X -v ON_ERROR_STOP=1 -d contrib_regression -c "CREATE EXTENSION IF NOT EXISTS ${ext}"
done
}
EXTENSIONS='[
@@ -40,8 +40,7 @@ EXTENSIONS='[
{"extname": "pg_uuidv7", "extdir": "pg_uuidv7-src"},
{"extname": "roaringbitmap", "extdir": "pg_roaringbitmap-src"},
{"extname": "semver", "extdir": "pg_semver-src"},
{"extname": "pg_ivm", "extdir": "pg_ivm-src"},
{"extname": "pgjwt", "extdir": "pgjwt-src"}
{"extname": "pg_ivm", "extdir": "pg_ivm-src"}
]'
EXTNAMES=$(echo ${EXTENSIONS} | jq -r '.[].extname' | paste -sd ' ' -)
TAG=${NEWTAG} docker compose --profile test-extensions up --quiet-pull --build -d

View File

@@ -204,16 +204,14 @@ impl RemoteExtSpec {
// Check if extension is present in public or custom.
// If not, then it is not allowed to be used by this compute.
if !self
.public_extensions
.as_ref()
.is_some_and(|exts| exts.iter().any(|e| e == ext_name))
&& !self
.custom_extensions
.as_ref()
.is_some_and(|exts| exts.iter().any(|e| e == ext_name))
{
return Err(anyhow::anyhow!("extension {} is not found", real_ext_name));
if let Some(public_extensions) = &self.public_extensions {
if !public_extensions.contains(&real_ext_name.to_string()) {
if let Some(custom_extensions) = &self.custom_extensions {
if !custom_extensions.contains(&real_ext_name.to_string()) {
return Err(anyhow::anyhow!("extension {} is not found", real_ext_name));
}
}
}
}
match self.extension_data.get(real_ext_name) {
@@ -342,96 +340,6 @@ mod tests {
use super::*;
use std::fs::File;
#[test]
fn allow_installing_remote_extensions() {
let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
"public_extensions": null,
"custom_extensions": null,
"library_index": {},
"extension_data": {},
}))
.unwrap();
rspec
.get_ext("ext", false, "latest", "v17")
.expect_err("Extension should not be found");
let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
"public_extensions": [],
"custom_extensions": null,
"library_index": {},
"extension_data": {},
}))
.unwrap();
rspec
.get_ext("ext", false, "latest", "v17")
.expect_err("Extension should not be found");
let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
"public_extensions": [],
"custom_extensions": [],
"library_index": {
"ext": "ext"
},
"extension_data": {
"ext": {
"control_data": {
"ext.control": ""
},
"archive_path": ""
}
},
}))
.unwrap();
rspec
.get_ext("ext", false, "latest", "v17")
.expect_err("Extension should not be found");
let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
"public_extensions": [],
"custom_extensions": ["ext"],
"library_index": {
"ext": "ext"
},
"extension_data": {
"ext": {
"control_data": {
"ext.control": ""
},
"archive_path": ""
}
},
}))
.unwrap();
rspec
.get_ext("ext", false, "latest", "v17")
.expect("Extension should be found");
let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
"public_extensions": ["ext"],
"custom_extensions": [],
"library_index": {
"ext": "ext"
},
"extension_data": {
"ext": {
"control_data": {
"ext.control": ""
},
"archive_path": ""
}
},
}))
.unwrap();
rspec
.get_ext("ext", false, "latest", "v17")
.expect("Extension should be found");
}
#[test]
fn parse_spec_file() {
let file = File::open("tests/cluster_spec.json").unwrap();

View File

@@ -32,7 +32,6 @@ use utils::id::TimelineId;
use crate::config::PageServerConf;
use crate::context::{PageContentKind, RequestContext};
use crate::pgdatadir_mapping::DatadirModificationStats;
use crate::task_mgr::TaskKind;
use crate::tenant::layer_map::LayerMap;
use crate::tenant::mgr::TenantSlot;
@@ -2379,40 +2378,11 @@ pub(crate) struct WalIngestMetrics {
pub(crate) records_observed: IntCounter,
pub(crate) records_committed: IntCounter,
pub(crate) records_filtered: IntCounter,
pub(crate) values_committed_metadata_images: IntCounter,
pub(crate) values_committed_metadata_deltas: IntCounter,
pub(crate) values_committed_data_images: IntCounter,
pub(crate) values_committed_data_deltas: IntCounter,
pub(crate) gap_blocks_zeroed_on_rel_extend: IntCounter,
}
impl WalIngestMetrics {
pub(crate) fn inc_values_committed(&self, stats: &DatadirModificationStats) {
if stats.metadata_images > 0 {
self.values_committed_metadata_images
.inc_by(stats.metadata_images);
}
if stats.metadata_deltas > 0 {
self.values_committed_metadata_deltas
.inc_by(stats.metadata_deltas);
}
if stats.data_images > 0 {
self.values_committed_data_images.inc_by(stats.data_images);
}
if stats.data_deltas > 0 {
self.values_committed_data_deltas.inc_by(stats.data_deltas);
}
}
pub(crate) clear_vm_bits_unknown: IntCounterVec,
}
pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| {
let values_committed = register_int_counter_vec!(
"pageserver_wal_ingest_values_committed",
"Number of values committed to pageserver storage from WAL records",
&["class", "kind"],
)
.expect("failed to define a metric");
WalIngestMetrics {
bytes_received: register_int_counter!(
"pageserver_wal_ingest_bytes_received",
@@ -2439,15 +2409,17 @@ pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| {
"Number of WAL records filtered out due to sharding"
)
.expect("failed to define a metric"),
values_committed_metadata_images: values_committed.with_label_values(&["metadata", "image"]),
values_committed_metadata_deltas: values_committed.with_label_values(&["metadata", "delta"]),
values_committed_data_images: values_committed.with_label_values(&["data", "image"]),
values_committed_data_deltas: values_committed.with_label_values(&["data", "delta"]),
gap_blocks_zeroed_on_rel_extend: register_int_counter!(
"pageserver_gap_blocks_zeroed_on_rel_extend",
"Total number of zero gap blocks written on relation extends"
)
.expect("failed to define a metric"),
clear_vm_bits_unknown: register_int_counter_vec!(
"pageserver_wal_ingest_clear_vm_bits_unknown",
"Number of ignored ClearVmBits operations due to unknown pages/relations",
&["entity"],
)
.expect("failed to define a metric"),
}
});

View File

@@ -48,7 +48,7 @@ use tracing::{debug, trace, warn};
use utils::bin_ser::DeserializeError;
use utils::pausable_failpoint;
use utils::{bin_ser::BeSer, lsn::Lsn};
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
use wal_decoder::serialized_batch::SerializedValueBatch;
/// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
pub const MAX_AUX_FILE_DELTAS: usize = 1024;
@@ -1297,26 +1297,6 @@ impl DatadirModification<'_> {
.is_some_and(|b| b.has_data())
}
/// Returns statistics about the currently pending modifications.
pub(crate) fn stats(&self) -> DatadirModificationStats {
let mut stats = DatadirModificationStats::default();
for (_, _, value) in self.pending_metadata_pages.values().flatten() {
match value {
Value::Image(_) => stats.metadata_images += 1,
Value::WalRecord(r) if r.will_init() => stats.metadata_images += 1,
Value::WalRecord(_) => stats.metadata_deltas += 1,
}
}
for valuemeta in self.pending_data_batch.iter().flat_map(|b| &b.metadata) {
match valuemeta {
ValueMeta::Serialized(s) if s.will_init => stats.data_images += 1,
ValueMeta::Serialized(_) => stats.data_deltas += 1,
ValueMeta::Observed(_) => {}
}
}
stats
}
/// Set the current lsn
pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
ensure!(
@@ -2337,15 +2317,6 @@ impl DatadirModification<'_> {
}
}
/// Statistics for a DatadirModification.
#[derive(Default)]
pub struct DatadirModificationStats {
pub metadata_images: u64,
pub metadata_deltas: u64,
pub data_images: u64,
pub data_deltas: u64,
}
/// This struct facilitates accessing either a committed key from the timeline at a
/// specific LSN, or the latest uncommitted key from a pending modification.
///

View File

@@ -9,14 +9,13 @@ use crate::{
metrics::SECONDARY_MODE,
tenant::{
config::AttachmentMode,
mgr::{GetTenantError, TenantManager},
mgr::GetTenantError,
mgr::TenantManager,
remote_timeline_client::remote_heatmap_path,
span::debug_assert_current_span_has_tenant_id,
tasks::{warn_when_period_overrun, BackgroundLoopKind},
Tenant,
},
virtual_file::VirtualFile,
TEMP_FILE_SUFFIX,
};
use futures::Future;
@@ -33,10 +32,7 @@ use super::{
};
use tokio_util::sync::CancellationToken;
use tracing::{info_span, instrument, Instrument};
use utils::{
backoff, completion::Barrier, crashsafe::path_with_suffix_extension,
yielding_loop::yielding_loop,
};
use utils::{backoff, completion::Barrier, yielding_loop::yielding_loop};
pub(super) async fn heatmap_uploader_task(
tenant_manager: Arc<TenantManager>,
@@ -465,18 +461,6 @@ async fn upload_tenant_heatmap(
}
}
// After a successful upload persist the fresh heatmap to disk.
// When restarting, the tenant will read the heatmap from disk
// and additively generate a new heatmap (see [`Timeline::generate_heatmap`]).
// If the heatmap is stale, the additive generation can lead to keeping previously
// evicted timelines on the secondarie's disk.
let tenant_shard_id = tenant.get_tenant_shard_id();
let heatmap_path = tenant.conf.tenant_heatmap_path(tenant_shard_id);
let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
if let Err(err) = VirtualFile::crashsafe_overwrite(heatmap_path, temp_path, bytes).await {
tracing::warn!("Non fatal IO error writing to disk after heatmap upload: {err}");
}
tracing::info!("Successfully uploaded {size} byte heatmap to {path}");
Ok(UploadHeatmapOutcome::Uploaded(LastUploadState {

View File

@@ -192,12 +192,7 @@ pub enum ImageLayerCreationMode {
#[derive(Clone, Debug, Default)]
pub enum LastImageLayerCreationStatus {
Incomplete {
/// The last key of the partition (exclusive) that was processed in the last
/// image layer creation attempt. We will continue from this key in the next
/// attempt.
last_key: Key,
},
Incomplete, // TODO: record the last key being processed
Complete,
#[default]
Initial,
@@ -4351,7 +4346,7 @@ impl Timeline {
Ok(result)
}
// Is it time to create a new image layer for the given partition? True if we want to generate.
// Is it time to create a new image layer for the given partition?
async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
let threshold = self.get_image_creation_threshold();
@@ -4663,11 +4658,6 @@ impl Timeline {
) -> Result<(Vec<ResidentLayer>, LastImageLayerCreationStatus), CreateImageLayersError> {
let timer = self.metrics.create_images_time_histo.start_timer();
if partitioning.parts.is_empty() {
warn!("no partitions to create image layers for");
return Ok((vec![], LastImageLayerCreationStatus::Complete));
}
// We need to avoid holes between generated image layers.
// Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
// image layer with hole between them. In this case such layer can not be utilized by GC.
@@ -4679,65 +4669,28 @@ impl Timeline {
// image layers <100000000..100000099> and <200000000..200000199> are not completely covering it.
let mut start = Key::MIN;
let check_for_image_layers =
if let LastImageLayerCreationStatus::Incomplete { last_key } = last_status {
info!(
"resuming image layer creation: last_status=incomplete, continue from {}",
last_key
);
true
} else {
self.should_check_if_image_layers_required(lsn)
};
let check_for_image_layers = if let LastImageLayerCreationStatus::Incomplete = last_status {
info!(
"resuming image layer creation: last_status={:?}",
last_status
);
true
} else {
self.should_check_if_image_layers_required(lsn)
};
let mut batch_image_writer = BatchLayerWriter::new(self.conf).await?;
let mut all_generated = true;
let mut partition_processed = 0;
let mut total_partitions = partitioning.parts.len();
let mut last_partition_processed = None;
let mut partition_parts = partitioning.parts.clone();
let total_partitions = partitioning.parts.len();
if let LastImageLayerCreationStatus::Incomplete { last_key } = last_status {
// We need to skip the partitions that have already been processed.
let mut found = false;
for (i, partition) in partition_parts.iter().enumerate() {
if last_key <= partition.end().unwrap() {
// ```plain
// |------|--------|----------|------|
// ^last_key
// ^start from this partition
// ```
// Why `i+1` instead of `i`?
// It is possible that the user did some writes after the previous image layer creation attempt so that
// a relation grows in size, and the last_key is now in the middle of the partition. In this case, we
// still want to skip this partition, so that we can make progress and avoid generating image layers over
// the same partition. Doing a mod to ensure we don't end up with an empty vec.
if i + 1 >= total_partitions {
// In general, this case should not happen -- if last_key is on the last partition, the previous
// iteration of image layer creation should return a complete status.
break; // with found=false
}
partition_parts = partition_parts.split_off(i + 1); // Remove the first i + 1 elements
total_partitions = partition_parts.len();
// Update the start key to the partition start.
start = partition_parts[0].start().unwrap();
found = true;
break;
}
}
if !found {
// Last key is within the last partition, or larger than all partitions.
return Ok((vec![], LastImageLayerCreationStatus::Complete));
}
}
for partition in partition_parts.iter() {
for partition in partitioning.parts.iter() {
if self.cancel.is_cancelled() {
return Err(CreateImageLayersError::Cancelled);
}
partition_processed += 1;
let img_range = start..partition.ranges.last().unwrap().end;
let compact_metadata = partition.overlaps(&Key::metadata_key_range());
if compact_metadata {
@@ -4772,8 +4725,6 @@ impl Timeline {
lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
is_delta: false,
}) {
// TODO: this can be processed with the BatchLayerWriter::finish_with_discard
// in the future.
tracing::info!(
"Skipping image layer at {lsn} {}..{}, already exists",
img_range.start,
@@ -4854,6 +4805,8 @@ impl Timeline {
}
}
partition_processed += 1;
if let ImageLayerCreationMode::Try = mode {
// We have at least made some progress
if batch_image_writer.pending_layer_num() >= 1 {
@@ -4869,10 +4822,8 @@ impl Timeline {
* self.get_compaction_threshold();
if image_preempt_threshold != 0 && num_of_l0_layers >= image_preempt_threshold {
tracing::info!(
"preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers {}",
partition.start().unwrap(), partition.end().unwrap(), num_of_l0_layers
"preempt image layer generation at {start} at {lsn}: too many L0 layers {num_of_l0_layers}",
);
last_partition_processed = Some(partition.clone());
all_generated = false;
break;
}
@@ -4917,14 +4868,7 @@ impl Timeline {
if all_generated {
LastImageLayerCreationStatus::Complete
} else {
LastImageLayerCreationStatus::Incomplete {
last_key: if let Some(last_partition_processed) = last_partition_processed {
last_partition_processed.end().unwrap_or(Key::MIN)
} else {
// This branch should be unreachable, but in case it happens, we can just return the start key.
Key::MIN
},
}
LastImageLayerCreationStatus::Incomplete
},
))
}

View File

@@ -748,7 +748,7 @@ impl Timeline {
.store(Arc::new(outcome.clone()));
self.upload_new_image_layers(image_layers)?;
if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
if let LastImageLayerCreationStatus::Incomplete = outcome {
// Yield and do not do any other kind of compaction.
info!("skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction).");
return Ok(CompactionOutcome::Pending);

View File

@@ -355,19 +355,6 @@ pub(super) async fn handle_walreceiver_connection(
// advances it to its end LSN. 0 is just an initialization placeholder.
let mut modification = timeline.begin_modification(Lsn(0));
async fn commit(
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
uncommitted: &mut u64,
) -> anyhow::Result<()> {
let stats = modification.stats();
modification.commit(ctx).await?;
WAL_INGEST.records_committed.inc_by(*uncommitted);
WAL_INGEST.inc_values_committed(&stats);
*uncommitted = 0;
Ok(())
}
if !records.is_empty() {
timeline
.metrics
@@ -379,7 +366,8 @@ pub(super) async fn handle_walreceiver_connection(
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
&& uncommitted_records > 0
{
commit(&mut modification, &ctx, &mut uncommitted_records).await?;
modification.commit(&ctx).await?;
uncommitted_records = 0;
}
let local_next_record_lsn = interpreted.next_record_lsn;
@@ -408,7 +396,8 @@ pub(super) async fn handle_walreceiver_connection(
|| modification.approx_pending_bytes()
> DatadirModification::MAX_PENDING_BYTES
{
commit(&mut modification, &ctx, &mut uncommitted_records).await?;
modification.commit(&ctx).await?;
uncommitted_records = 0;
}
}
@@ -426,7 +415,7 @@ pub(super) async fn handle_walreceiver_connection(
if uncommitted_records > 0 || needs_last_record_lsn_advance {
// Commit any uncommitted records
commit(&mut modification, &ctx, &mut uncommitted_records).await?;
modification.commit(&ctx).await?;
}
if !caught_up && streaming_lsn >= end_of_wal {
@@ -453,12 +442,10 @@ pub(super) async fn handle_walreceiver_connection(
filtered: &mut u64,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let stats = modification.stats();
modification.commit(ctx).await?;
WAL_INGEST
.records_committed
.inc_by(*uncommitted - *filtered);
WAL_INGEST.inc_values_committed(&stats);
modification.commit(ctx).await?;
*uncommitted = 0;
*filtered = 0;
Ok(())

View File

@@ -28,9 +28,17 @@ use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::fsm_logical_to_physical;
use postgres_ffi::walrecord::*;
use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
use wal_decoder::models::*;
use anyhow::{bail, Result};
use bytes::{Buf, Bytes};
use tracing::*;
use utils::failpoint_support;
use utils::rate_limit::RateLimit;
use crate::context::RequestContext;
use crate::metrics::WAL_INGEST;
@@ -42,18 +50,11 @@ use crate::ZERO_PAGE;
use pageserver_api::key::rel_block_to_key;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::fsm_logical_to_physical;
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::walrecord::*;
use postgres_ffi::TransactionId;
use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
use utils::bin_ser::SerializeError;
use utils::lsn::Lsn;
use utils::rate_limit::RateLimit;
use utils::{critical, failpoint_support};
use wal_decoder::models::*;
enum_pgversion! {CheckPoint, pgv::CheckPoint}
@@ -326,75 +327,93 @@ impl WalIngest {
let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
// VM bits can only be cleared on the shard(s) owning the VM relation, and must be within
// its view of the VM relation size. Out of caution, error instead of failing WAL ingestion,
// as there has historically been cases where PostgreSQL has cleared spurious VM pages. See:
// https://github.com/neondatabase/neon/pull/10634.
// Sometimes, Postgres seems to create heap WAL records with the
// ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
// not set. In fact, it's possible that the VM page does not exist at all.
// In that case, we don't want to store a record to clear the VM bit;
// replaying it would fail to find the previous image of the page, because
// it doesn't exist. So check if the VM page(s) exist, and skip the WAL
// record if it doesn't.
//
// TODO: analyze the metrics and tighten this up accordingly. This logic
// implicitly assumes that VM pages see explicit WAL writes before
// implicit ClearVmBits, and will otherwise silently drop updates.
let Some(vm_size) = get_relsize(modification, vm_rel, ctx).await? else {
critical!("clear_vm_bits for unknown VM relation {vm_rel}");
WAL_INGEST
.clear_vm_bits_unknown
.with_label_values(&["relation"])
.inc();
return Ok(());
};
if let Some(blknum) = new_vm_blk {
if blknum >= vm_size {
critical!("new_vm_blk {blknum} not in {vm_rel} of size {vm_size}");
WAL_INGEST
.clear_vm_bits_unknown
.with_label_values(&["new_page"])
.inc();
new_vm_blk = None;
}
}
if let Some(blknum) = old_vm_blk {
if blknum >= vm_size {
critical!("old_vm_blk {blknum} not in {vm_rel} of size {vm_size}");
WAL_INGEST
.clear_vm_bits_unknown
.with_label_values(&["old_page"])
.inc();
old_vm_blk = None;
}
}
if new_vm_blk.is_none() && old_vm_blk.is_none() {
return Ok(());
} else if new_vm_blk == old_vm_blk {
// An UPDATE record that needs to clear the bits for both old and the new page, both of
// which reside on the same VM page.
self.put_rel_wal_record(
modification,
vm_rel,
new_vm_blk.unwrap(),
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
old_heap_blkno,
flags,
},
ctx,
)
.await?;
} else {
// Clear VM bits for one heap page, or for two pages that reside on different VM pages.
if let Some(new_vm_blk) = new_vm_blk {
if new_vm_blk.is_some() || old_vm_blk.is_some() {
if new_vm_blk == old_vm_blk {
// An UPDATE record that needs to clear the bits for both old and the
// new page, both of which reside on the same VM page.
self.put_rel_wal_record(
modification,
vm_rel,
new_vm_blk,
new_vm_blk.unwrap(),
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
old_heap_blkno: None,
flags,
},
ctx,
)
.await?;
}
if let Some(old_vm_blk) = old_vm_blk {
self.put_rel_wal_record(
modification,
vm_rel,
old_vm_blk,
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno: None,
old_heap_blkno,
flags,
},
ctx,
)
.await?;
} else {
// Clear VM bits for one heap page, or for two pages that reside on
// different VM pages.
if let Some(new_vm_blk) = new_vm_blk {
self.put_rel_wal_record(
modification,
vm_rel,
new_vm_blk,
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
old_heap_blkno: None,
flags,
},
ctx,
)
.await?;
}
if let Some(old_vm_blk) = old_vm_blk {
self.put_rel_wal_record(
modification,
vm_rel,
old_vm_blk,
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno: None,
old_heap_blkno,
flags,
},
ctx,
)
.await?;
}
}
}
Ok(())
}

View File

@@ -220,8 +220,10 @@ lfc_maybe_disabled(void)
static bool
lfc_ensure_opened(void)
{
bool enabled = !lfc_maybe_disabled();
/* Open cache file if not done yet */
if (lfc_desc <= 0)
if (lfc_desc <= 0 && enabled)
{
lfc_desc = BasicOpenFile(lfc_path, O_RDWR);
@@ -231,7 +233,7 @@ lfc_ensure_opened(void)
return false;
}
}
return true;
return enabled;
}
static void
@@ -336,11 +338,10 @@ lfc_change_limit_hook(int newval, void *extra)
{
uint32 new_size = SIZE_MB_TO_CHUNKS(newval);
if (!lfc_ctl || !is_normal_backend())
if (!is_normal_backend())
return;
/* Open LFC file only if LFC was enabled or we are going to reenable it */
if ((newval > 0 || LFC_ENABLED()) && !lfc_ensure_opened())
if (!lfc_ensure_opened())
return;
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);

View File

@@ -19,7 +19,6 @@ aws-config.workspace = true
aws-sdk-iam.workspace = true
aws-sigv4.workspace = true
base64.workspace = true
boxcar = "0.2.8"
bstr.workspace = true
bytes = { workspace = true, features = ["serde"] }
camino.workspace = true
@@ -43,7 +42,6 @@ hyper0.workspace = true
hyper = { workspace = true, features = ["server", "http1", "http2"] }
hyper-util = { version = "0.1", features = ["server", "http1", "http2", "tokio"] }
http-body-util = { version = "0.1" }
gettid = "0.1.3"
indexmap = { workspace = true, features = ["serde"] }
ipnet.workspace = true
itertools.workspace = true
@@ -52,8 +50,6 @@ lasso = { workspace = true, features = ["multi-threaded"] }
measured = { workspace = true, features = ["lasso"] }
metrics.workspace = true
once_cell.workspace = true
opentelemetry = { workspace = true, features = ["trace"] }
papaya = "0.1.8"
parking_lot.workspace = true
parquet.workspace = true
parquet_derive.workspace = true
@@ -93,9 +89,6 @@ tokio = { workspace = true, features = ["signal"] }
tracing-subscriber.workspace = true
tracing-utils.workspace = true
tracing.workspace = true
tracing-log.workspace = true
tracing-serde.workspace = true
tracing-opentelemetry.workspace = true
try-lock.workspace = true
typed-json.workspace = true
url.workspace = true
@@ -119,7 +112,6 @@ rsa = "0.9"
workspace_hack.workspace = true
[dev-dependencies]
assert-json-diff.workspace = true
camino-tempfile.workspace = true
fallible-iterator.workspace = true
flate2.workspace = true

View File

@@ -1,23 +1,10 @@
use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::hash::BuildHasher;
use std::{env, io};
use chrono::{DateTime, Utc};
use opentelemetry::trace::TraceContextExt;
use scopeguard::defer;
use serde::ser::{SerializeMap, Serializer};
use tracing::span;
use tracing::subscriber::Interest;
use tracing::{callsite, Event, Metadata, Span, Subscriber};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing::Subscriber;
use tracing_subscriber::filter::{EnvFilter, LevelFilter};
use tracing_subscriber::fmt::format::{Format, Full};
use tracing_subscriber::fmt::time::SystemTime;
use tracing_subscriber::fmt::{FormatEvent, FormatFields};
use tracing_subscriber::layer::{Context, Layer};
use tracing_subscriber::prelude::*;
use tracing_subscriber::registry::{LookupSpan, SpanRef};
use tracing_subscriber::registry::LookupSpan;
/// Initialize logging and OpenTelemetry tracing and exporter.
///
@@ -28,8 +15,6 @@ use tracing_subscriber::registry::{LookupSpan, SpanRef};
/// destination, set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`.
/// See <https://opentelemetry.io/docs/reference/specification/sdk-environment-variables>
pub async fn init() -> anyhow::Result<LoggingGuard> {
let logfmt = LogFormat::from_env()?;
let env_filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy()
@@ -44,36 +29,17 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
.expect("this should be a valid filter directive"),
);
let fmt_layer = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_writer(std::io::stderr)
.with_target(false);
let otlp_layer = tracing_utils::init_tracing("proxy").await;
let json_log_layer = if logfmt == LogFormat::Json {
Some(JsonLoggingLayer {
clock: RealClock,
skipped_field_indices: papaya::HashMap::default(),
writer: StderrWriter {
stderr: std::io::stderr(),
},
})
} else {
None
};
let text_log_layer = if logfmt == LogFormat::Text {
Some(
tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_writer(std::io::stderr)
.with_target(false),
)
} else {
None
};
tracing_subscriber::registry()
.with(env_filter)
.with(otlp_layer)
.with(json_log_layer)
.with(text_log_layer)
.with(fmt_layer)
.try_init()?;
Ok(LoggingGuard)
@@ -128,857 +94,3 @@ impl Drop for LoggingGuard {
tracing_utils::shutdown_tracing();
}
}
// TODO: make JSON the default
#[derive(Copy, Clone, PartialEq, Eq, Default, Debug)]
enum LogFormat {
#[default]
Text = 1,
Json,
}
impl LogFormat {
fn from_env() -> anyhow::Result<Self> {
let logfmt = env::var("LOGFMT");
Ok(match logfmt.as_deref() {
Err(_) => LogFormat::default(),
Ok("text") => LogFormat::Text,
Ok("json") => LogFormat::Json,
Ok(logfmt) => anyhow::bail!("unknown log format: {logfmt}"),
})
}
}
trait MakeWriter {
fn make_writer(&self) -> impl io::Write;
}
struct StderrWriter {
stderr: io::Stderr,
}
impl MakeWriter for StderrWriter {
#[inline]
fn make_writer(&self) -> impl io::Write {
self.stderr.lock()
}
}
// TODO: move into separate module or even separate crate.
trait Clock {
fn now(&self) -> DateTime<Utc>;
}
struct RealClock;
impl Clock for RealClock {
#[inline]
fn now(&self) -> DateTime<Utc> {
Utc::now()
}
}
/// Name of the field used by tracing crate to store the event message.
const MESSAGE_FIELD: &str = "message";
thread_local! {
/// Protects against deadlocks and double panics during log writing.
/// The current panic handler will use tracing to log panic information.
static REENTRANCY_GUARD: Cell<bool> = const { Cell::new(false) };
/// Thread-local instance with per-thread buffer for log writing.
static EVENT_FORMATTER: RefCell<EventFormatter> = RefCell::new(EventFormatter::new());
/// Cached OS thread ID.
static THREAD_ID: u64 = gettid::gettid();
}
/// Implements tracing layer to handle events specific to logging.
struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
clock: C,
skipped_field_indices: papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
writer: W,
}
impl<S, C: Clock + 'static, W: MakeWriter + 'static> Layer<S> for JsonLoggingLayer<C, W>
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
use std::io::Write;
// TODO: consider special tracing subscriber to grab timestamp very
// early, before OTel machinery, and add as event extension.
let now = self.clock.now();
let res: io::Result<()> = REENTRANCY_GUARD.with(move |entered| {
if entered.get() {
let mut formatter = EventFormatter::new();
formatter.format(now, event, &ctx, &self.skipped_field_indices)?;
self.writer.make_writer().write_all(formatter.buffer())
} else {
entered.set(true);
defer!(entered.set(false););
EVENT_FORMATTER.with_borrow_mut(move |formatter| {
formatter.reset();
formatter.format(now, event, &ctx, &self.skipped_field_indices)?;
self.writer.make_writer().write_all(formatter.buffer())
})
}
});
// In case logging fails we generate a simpler JSON object.
if let Err(err) = res {
if let Ok(mut line) = serde_json::to_vec(&serde_json::json!( {
"timestamp": now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
"level": "ERROR",
"message": format_args!("cannot log event: {err:?}"),
"fields": {
"event": format_args!("{event:?}"),
},
})) {
line.push(b'\n');
self.writer.make_writer().write_all(&line).ok();
}
}
}
/// Registers a SpanFields instance as span extension.
fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
let span = ctx.span(id).expect("span must exist");
let fields = SpanFields::default();
fields.record_fields(attrs);
// This could deadlock when there's a panic somewhere in the tracing
// event handling and a read or write guard is still held. This includes
// the OTel subscriber.
span.extensions_mut().insert(fields);
}
fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
let span = ctx.span(id).expect("span must exist");
let ext = span.extensions();
if let Some(data) = ext.get::<SpanFields>() {
data.record_fields(values);
}
}
/// Called (lazily) whenever a new log call is executed. We quickly check
/// for duplicate field names and record duplicates as skippable. Last one
/// wins.
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
if !metadata.is_event() {
// Must not be never because we wouldn't get trace and span data.
return Interest::always();
}
let mut field_indices = SkippedFieldIndices::default();
let mut seen_fields = HashMap::<&'static str, usize>::new();
for field in metadata.fields() {
use std::collections::hash_map::Entry;
match seen_fields.entry(field.name()) {
Entry::Vacant(entry) => {
// field not seen yet
entry.insert(field.index());
}
Entry::Occupied(mut entry) => {
// replace currently stored index
let old_index = entry.insert(field.index());
// ... and append it to list of skippable indices
field_indices.push(old_index);
}
}
}
if !field_indices.is_empty() {
self.skipped_field_indices
.pin()
.insert(metadata.callsite(), field_indices);
}
Interest::always()
}
}
/// Stores span field values recorded during the spans lifetime.
#[derive(Default)]
struct SpanFields {
// TODO: Switch to custom enum with lasso::Spur for Strings?
fields: papaya::HashMap<&'static str, serde_json::Value>,
}
impl SpanFields {
#[inline]
fn record_fields<R: tracing_subscriber::field::RecordFields>(&self, fields: R) {
fields.record(&mut SpanFieldsRecorder {
fields: self.fields.pin(),
});
}
}
/// Implements a tracing field visitor to convert and store values.
struct SpanFieldsRecorder<'m, S, G> {
fields: papaya::HashMapRef<'m, &'static str, serde_json::Value, S, G>,
}
impl<S: BuildHasher, G: papaya::Guard> tracing::field::Visit for SpanFieldsRecorder<'_, S, G> {
#[inline]
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
}
#[inline]
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
}
#[inline]
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
}
#[inline]
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
if let Ok(value) = i64::try_from(value) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
} else {
self.fields
.insert(field.name(), serde_json::Value::from(format!("{value}")));
}
}
#[inline]
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
if let Ok(value) = u64::try_from(value) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
} else {
self.fields
.insert(field.name(), serde_json::Value::from(format!("{value}")));
}
}
#[inline]
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
}
#[inline]
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
}
#[inline]
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
}
#[inline]
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
self.fields
.insert(field.name(), serde_json::Value::from(format!("{value:?}")));
}
#[inline]
fn record_error(
&mut self,
field: &tracing::field::Field,
value: &(dyn std::error::Error + 'static),
) {
self.fields
.insert(field.name(), serde_json::Value::from(format!("{value}")));
}
}
/// List of field indices skipped during logging. Can list duplicate fields or
/// metafields not meant to be logged.
#[derive(Clone, Default)]
struct SkippedFieldIndices {
bits: u64,
}
impl SkippedFieldIndices {
#[inline]
fn is_empty(&self) -> bool {
self.bits == 0
}
#[inline]
fn push(&mut self, index: usize) {
self.bits |= 1u64
.checked_shl(index as u32)
.expect("field index too large");
}
#[inline]
fn contains(&self, index: usize) -> bool {
self.bits
& 1u64
.checked_shl(index as u32)
.expect("field index too large")
!= 0
}
}
/// Formats a tracing event and writes JSON to its internal buffer including a newline.
// TODO: buffer capacity management, truncate if too large
struct EventFormatter {
logline_buffer: Vec<u8>,
}
impl EventFormatter {
#[inline]
fn new() -> Self {
EventFormatter {
logline_buffer: Vec::new(),
}
}
#[inline]
fn buffer(&self) -> &[u8] {
&self.logline_buffer
}
#[inline]
fn reset(&mut self) {
self.logline_buffer.clear();
}
fn format<S>(
&mut self,
now: DateTime<Utc>,
event: &Event<'_>,
ctx: &Context<'_, S>,
skipped_field_indices: &papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
) -> io::Result<()>
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
use tracing_log::NormalizeEvent;
let normalized_meta = event.normalized_metadata();
let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
let skipped_field_indices = skipped_field_indices.pin();
let skipped_field_indices = skipped_field_indices.get(&meta.callsite());
let mut serialize = || {
let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer);
let mut serializer = serializer.serialize_map(None)?;
// Timestamp comes first, so raw lines can be sorted by timestamp.
serializer.serialize_entry("timestamp", &timestamp)?;
// Level next.
serializer.serialize_entry("level", &meta.level().as_str())?;
// Message next.
serializer.serialize_key("message")?;
let mut message_extractor =
MessageFieldExtractor::new(serializer, skipped_field_indices);
event.record(&mut message_extractor);
let mut serializer = message_extractor.into_serializer()?;
let mut fields_present = FieldsPresent(false, skipped_field_indices);
event.record(&mut fields_present);
if fields_present.0 {
serializer.serialize_entry(
"fields",
&SerializableEventFields(event, skipped_field_indices),
)?;
}
let pid = std::process::id();
if pid != 1 {
serializer.serialize_entry("process_id", &pid)?;
}
THREAD_ID.with(|tid| serializer.serialize_entry("thread_id", tid))?;
// TODO: tls cache? name could change
if let Some(thread_name) = std::thread::current().name() {
if !thread_name.is_empty() && thread_name != "tokio-runtime-worker" {
serializer.serialize_entry("thread_name", thread_name)?;
}
}
if let Some(task_id) = tokio::task::try_id() {
serializer.serialize_entry("task_id", &format_args!("{task_id}"))?;
}
serializer.serialize_entry("target", meta.target())?;
if let Some(module) = meta.module_path() {
if module != meta.target() {
serializer.serialize_entry("module", module)?;
}
}
if let Some(file) = meta.file() {
if let Some(line) = meta.line() {
serializer.serialize_entry("src", &format_args!("{file}:{line}"))?;
} else {
serializer.serialize_entry("src", file)?;
}
}
{
let otel_context = Span::current().context();
let otel_spanref = otel_context.span();
let span_context = otel_spanref.span_context();
if span_context.is_valid() {
serializer.serialize_entry(
"trace_id",
&format_args!("{}", span_context.trace_id()),
)?;
}
}
serializer.serialize_entry("spans", &SerializableSpanStack(ctx))?;
serializer.end()
};
serialize().map_err(io::Error::other)?;
self.logline_buffer.push(b'\n');
Ok(())
}
}
/// Extracts the message field that's mixed will other fields.
struct MessageFieldExtractor<'a, S: serde::ser::SerializeMap> {
serializer: S,
skipped_field_indices: Option<&'a SkippedFieldIndices>,
state: Option<Result<(), S::Error>>,
}
impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> {
#[inline]
fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
Self {
serializer,
skipped_field_indices,
state: None,
}
}
#[inline]
fn into_serializer(mut self) -> Result<S, S::Error> {
match self.state {
Some(Ok(())) => {}
Some(Err(err)) => return Err(err),
None => self.serializer.serialize_value("")?,
}
Ok(self.serializer)
}
#[inline]
fn accept_field(&self, field: &tracing::field::Field) -> bool {
self.state.is_none()
&& field.name() == MESSAGE_FIELD
&& !self
.skipped_field_indices
.is_some_and(|i| i.contains(field.index()))
}
}
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<'_, S> {
#[inline]
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&value));
}
}
#[inline]
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&value));
}
}
#[inline]
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&value));
}
}
#[inline]
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&value));
}
}
#[inline]
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&value));
}
}
#[inline]
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&value));
}
}
#[inline]
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&format_args!("{value:x?}")));
}
}
#[inline]
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&value));
}
}
#[inline]
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&format_args!("{value:?}")));
}
}
#[inline]
fn record_error(
&mut self,
field: &tracing::field::Field,
value: &(dyn std::error::Error + 'static),
) {
if self.accept_field(field) {
self.state = Some(self.serializer.serialize_value(&format_args!("{value}")));
}
}
}
/// Checks if there's any fields and field values present. If not, the JSON subobject
/// can be skipped.
// This is entirely optional and only cosmetic, though maybe helps a
// bit during log parsing in dashboards when there's no field with empty object.
struct FieldsPresent<'a>(pub bool, Option<&'a SkippedFieldIndices>);
// Even though some methods have an overhead (error, bytes) it is assumed the
// compiler won't include this since we ignore the value entirely.
impl tracing::field::Visit for FieldsPresent<'_> {
#[inline]
fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) {
if !self.1.is_some_and(|i| i.contains(field.index()))
&& field.name() != MESSAGE_FIELD
&& !field.name().starts_with("log.")
{
self.0 |= true;
}
}
}
/// Serializes the fields directly supplied with a log event.
struct SerializableEventFields<'a, 'event>(
&'a tracing::Event<'event>,
Option<&'a SkippedFieldIndices>,
);
impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
use serde::ser::SerializeMap;
let serializer = serializer.serialize_map(None)?;
let mut message_skipper = MessageFieldSkipper::new(serializer, self.1);
self.0.record(&mut message_skipper);
let serializer = message_skipper.into_serializer()?;
serializer.end()
}
}
/// A tracing field visitor that skips the message field.
struct MessageFieldSkipper<'a, S: serde::ser::SerializeMap> {
serializer: S,
skipped_field_indices: Option<&'a SkippedFieldIndices>,
state: Result<(), S::Error>,
}
impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
#[inline]
fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
Self {
serializer,
skipped_field_indices,
state: Ok(()),
}
}
#[inline]
fn accept_field(&self, field: &tracing::field::Field) -> bool {
self.state.is_ok()
&& field.name() != MESSAGE_FIELD
&& !field.name().starts_with("log.")
&& !self
.skipped_field_indices
.is_some_and(|i| i.contains(field.index()))
}
#[inline]
fn into_serializer(self) -> Result<S, S::Error> {
self.state?;
Ok(self.serializer)
}
}
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<'_, S> {
#[inline]
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
if self.accept_field(field) {
self.state = self.serializer.serialize_entry(field.name(), &value);
}
}
#[inline]
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
if self.accept_field(field) {
self.state = self.serializer.serialize_entry(field.name(), &value);
}
}
#[inline]
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
if self.accept_field(field) {
self.state = self.serializer.serialize_entry(field.name(), &value);
}
}
#[inline]
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
if self.accept_field(field) {
self.state = self.serializer.serialize_entry(field.name(), &value);
}
}
#[inline]
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
if self.accept_field(field) {
self.state = self.serializer.serialize_entry(field.name(), &value);
}
}
#[inline]
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
if self.accept_field(field) {
self.state = self.serializer.serialize_entry(field.name(), &value);
}
}
#[inline]
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
if self.accept_field(field) {
self.state = self
.serializer
.serialize_entry(field.name(), &format_args!("{value:x?}"));
}
}
#[inline]
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
if self.accept_field(field) {
self.state = self.serializer.serialize_entry(field.name(), &value);
}
}
#[inline]
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
if self.accept_field(field) {
self.state = self
.serializer
.serialize_entry(field.name(), &format_args!("{value:?}"));
}
}
#[inline]
fn record_error(
&mut self,
field: &tracing::field::Field,
value: &(dyn std::error::Error + 'static),
) {
if self.accept_field(field) {
self.state = self.serializer.serialize_value(&format_args!("{value}"));
}
}
}
/// Serializes the span stack from root to leaf (parent of event) enumerated
/// inside an object where the keys are just the number padded with zeroes
/// to retain sorting order.
// The object is necessary because Loki cannot flatten arrays.
struct SerializableSpanStack<'a, 'b, Span>(&'b Context<'a, Span>)
where
Span: Subscriber + for<'lookup> LookupSpan<'lookup>;
impl<Span> serde::ser::Serialize for SerializableSpanStack<'_, '_, Span>
where
Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
{
fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
where
Ser: serde::ser::Serializer,
{
let mut serializer = serializer.serialize_map(None)?;
if let Some(leaf_span) = self.0.lookup_current() {
for (i, span) in leaf_span.scope().from_root().enumerate() {
serializer.serialize_entry(&format_args!("{i:02}"), &SerializableSpan(&span))?;
}
}
serializer.end()
}
}
/// Serializes a single span. Include the span ID, name and its fields as
/// recorded up to this point.
struct SerializableSpan<'a, 'b, Span>(&'b SpanRef<'a, Span>)
where
Span: for<'lookup> LookupSpan<'lookup>;
impl<Span> serde::ser::Serialize for SerializableSpan<'_, '_, Span>
where
Span: for<'lookup> LookupSpan<'lookup>,
{
fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
where
Ser: serde::ser::Serializer,
{
let mut serializer = serializer.serialize_map(None)?;
// TODO: the span ID is probably only useful for debugging tracing.
serializer.serialize_entry("span_id", &format_args!("{:016x}", self.0.id().into_u64()))?;
serializer.serialize_entry("span_name", self.0.metadata().name())?;
let ext = self.0.extensions();
if let Some(data) = ext.get::<SpanFields>() {
for (key, value) in &data.fields.pin() {
serializer.serialize_entry(key, value)?;
}
}
serializer.end()
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use std::sync::{Arc, Mutex, MutexGuard};
use assert_json_diff::assert_json_eq;
use tracing::info_span;
use super::*;
struct TestClock {
current_time: Mutex<DateTime<Utc>>,
}
impl Clock for Arc<TestClock> {
fn now(&self) -> DateTime<Utc> {
*self.current_time.lock().expect("poisoned")
}
}
struct VecWriter<'a> {
buffer: MutexGuard<'a, Vec<u8>>,
}
impl MakeWriter for Arc<Mutex<Vec<u8>>> {
fn make_writer(&self) -> impl io::Write {
VecWriter {
buffer: self.lock().expect("poisoned"),
}
}
}
impl io::Write for VecWriter<'_> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.buffer.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
#[test]
fn test_field_collection() {
let clock = Arc::new(TestClock {
current_time: Mutex::new(Utc::now()),
});
let buffer = Arc::new(Mutex::new(Vec::new()));
let log_layer = JsonLoggingLayer {
clock: clock.clone(),
skipped_field_indices: papaya::HashMap::default(),
writer: buffer.clone(),
};
let registry = tracing_subscriber::Registry::default().with(log_layer);
tracing::subscriber::with_default(registry, || {
info_span!("span1", x = 40, x = 41, x = 42).in_scope(|| {
info_span!("span2").in_scope(|| {
tracing::error!(
a = 1,
a = 2,
a = 3,
message = "explicit message field",
"implicit message field"
);
});
});
});
let buffer = Arc::try_unwrap(buffer)
.expect("no other reference")
.into_inner()
.expect("poisoned");
let actual: serde_json::Value = serde_json::from_slice(&buffer).expect("valid JSON");
let expected: serde_json::Value = serde_json::json!(
{
"timestamp": clock.now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
"level": "ERROR",
"message": "explicit message field",
"fields": {
"a": 3,
},
"spans": {
"00":{
"span_id": "0000000000000001",
"span_name": "span1",
"x": 42,
},
"01": {
"span_id": "0000000000000002",
"span_name": "span2",
}
},
"src": actual.as_object().unwrap().get("src").unwrap().as_str().unwrap(),
"target": "proxy::logging::tests",
"process_id": actual.as_object().unwrap().get("process_id").unwrap().as_number().unwrap(),
"thread_id": actual.as_object().unwrap().get("thread_id").unwrap().as_number().unwrap(),
"thread_name": "logging::tests::test_field_collection",
}
);
assert_json_eq!(actual, expected);
}
}

View File

@@ -225,7 +225,7 @@ pub(crate) enum NotifyError {
// We shutdown while sending
#[error("Shutting down")]
ShuttingDown,
// A response indicates we will never succeed, such as 400 or 403
// A response indicates we will never succeed, such as 400 or 404
#[error("Non-retryable error {0}")]
Fatal(StatusCode),

View File

@@ -27,7 +27,7 @@ use pageserver_api::shard::ShardConfigError;
use pageserver_api::shard::ShardIdentity;
use pageserver_api::shard::ShardStripeSize;
use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
use rustls::client::danger::{ServerCertVerified, ServerCertVerifier};
use rustls::client::danger::ServerCertVerifier;
use rustls::client::WebPkiServerVerifier;
use rustls::crypto::ring;
use scoped_futures::ScopedBoxFuture;
@@ -194,8 +194,6 @@ impl Persistence {
timeout: Duration,
) -> Result<(), diesel::ConnectionError> {
let started_at = Instant::now();
log_postgres_connstr_info(database_url)
.map_err(|e| diesel::ConnectionError::InvalidConnectionUrl(e.to_string()))?;
loop {
match establish_connection_rustls(database_url).await {
Ok(_) => {
@@ -1283,51 +1281,6 @@ pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
Ok(Arc::new(store))
}
#[derive(Debug)]
/// A verifier that accepts all certificates (but logs an error still)
struct AcceptAll(Arc<WebPkiServerVerifier>);
impl ServerCertVerifier for AcceptAll {
fn verify_server_cert(
&self,
end_entity: &rustls::pki_types::CertificateDer<'_>,
intermediates: &[rustls::pki_types::CertificateDer<'_>],
server_name: &rustls::pki_types::ServerName<'_>,
ocsp_response: &[u8],
now: rustls::pki_types::UnixTime,
) -> Result<ServerCertVerified, rustls::Error> {
let r =
self.0
.verify_server_cert(end_entity, intermediates, server_name, ocsp_response, now);
if let Err(err) = r {
tracing::info!(
?server_name,
"ignoring db connection TLS validation error: {err:?}"
);
return Ok(ServerCertVerified::assertion());
}
r
}
fn verify_tls12_signature(
&self,
message: &[u8],
cert: &rustls::pki_types::CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
self.0.verify_tls12_signature(message, cert, dss)
}
fn verify_tls13_signature(
&self,
message: &[u8],
cert: &rustls::pki_types::CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
self.0.verify_tls13_signature(message, cert, dss)
}
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
self.0.supported_verify_schemes()
}
}
/// Loads the root certificates and constructs a client config suitable for connecting.
/// This function is blocking.
fn client_config_with_root_certs() -> anyhow::Result<rustls::ClientConfig> {
@@ -1337,12 +1290,76 @@ fn client_config_with_root_certs() -> anyhow::Result<rustls::ClientConfig> {
.expect("ring should support the default protocol versions");
static DO_CERT_CHECKS: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
let do_cert_checks =
DO_CERT_CHECKS.get_or_init(|| std::env::var("STORCON_DB_CERT_CHECKS").is_ok());
DO_CERT_CHECKS.get_or_init(|| std::env::var("STORCON_CERT_CHECKS").is_ok());
Ok(if *do_cert_checks {
client_config
.with_root_certificates(load_certs()?)
.with_no_client_auth()
} else {
use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified};
#[derive(Debug)]
struct AcceptAll(Arc<WebPkiServerVerifier>);
impl ServerCertVerifier for AcceptAll {
fn verify_server_cert(
&self,
end_entity: &rustls::pki_types::CertificateDer<'_>,
intermediates: &[rustls::pki_types::CertificateDer<'_>],
server_name: &rustls::pki_types::ServerName<'_>,
ocsp_response: &[u8],
now: rustls::pki_types::UnixTime,
) -> Result<ServerCertVerified, rustls::Error> {
let r = self.0.verify_server_cert(
end_entity,
intermediates,
server_name,
ocsp_response,
now,
);
if let Err(err) = r {
tracing::info!(
?server_name,
"ignoring db connection TLS validation error: {err:?}"
);
return Ok(ServerCertVerified::assertion());
}
r
}
fn verify_tls12_signature(
&self,
message: &[u8],
cert: &rustls::pki_types::CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error>
{
let r = self.0.verify_tls12_signature(message, cert, dss);
if let Err(err) = r {
tracing::info!(
"ignoring db connection 1.2 signature TLS validation error: {err:?}"
);
return Ok(HandshakeSignatureValid::assertion());
}
r
}
fn verify_tls13_signature(
&self,
message: &[u8],
cert: &rustls::pki_types::CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error>
{
let r = self.0.verify_tls13_signature(message, cert, dss);
if let Err(err) = r {
tracing::info!(
"ignoring db connection 1.3 signature TLS validation error: {err:?}"
);
return Ok(HandshakeSignatureValid::assertion());
}
r
}
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
self.0.supported_verify_schemes()
}
}
let verifier = AcceptAll(
WebPkiServerVerifier::builder_with_provider(
load_certs()?,
@@ -1372,29 +1389,6 @@ fn establish_connection_rustls(config: &str) -> BoxFuture<ConnectionResult<Async
fut.boxed()
}
#[cfg_attr(test, test)]
fn test_config_debug_censors_password() {
let has_pw =
"host=/var/lib/postgresql,localhost port=1234 user=specialuser password='NOT ALLOWED TAG'";
let has_pw_cfg = has_pw.parse::<tokio_postgres::Config>().unwrap();
assert!(format!("{has_pw_cfg:?}").contains("specialuser"));
// Ensure that the password is not leaked by the debug impl
assert!(!format!("{has_pw_cfg:?}").contains("NOT ALLOWED TAG"));
}
fn log_postgres_connstr_info(config_str: &str) -> anyhow::Result<()> {
let config = config_str
.parse::<tokio_postgres::Config>()
.map_err(|_e| anyhow::anyhow!("Couldn't parse config str"))?;
// We use debug formatting here, and use a unit test to ensure that we don't leak the password.
// To make extra sure the test gets ran, run it every time the function is called
// (this is rather cold code, we can afford it).
#[cfg(not(test))]
test_config_debug_censors_password();
tracing::info!("database connection config: {config:?}");
Ok(())
}
/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
#[derive(
QueryableByName, Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq,

View File

@@ -115,15 +115,6 @@ impl ReconcilerConfigBuilder {
}
}
pub(crate) fn tenant_creation_hint(self, hint: bool) -> Self {
Self {
config: ReconcilerConfig {
tenant_creation_hint: hint,
..self.config
},
}
}
pub(crate) fn build(self) -> ReconcilerConfig {
self.config
}
@@ -138,10 +129,6 @@ pub(crate) struct ReconcilerConfig {
// During live migrations this is the amount of time that
// the pagserver will hold our poll.
secondary_download_request_timeout: Option<Duration>,
// A hint indicating whether this reconciliation is done on the
// creation of a new tenant. This only informs logging behaviour.
tenant_creation_hint: bool,
}
impl ReconcilerConfig {
@@ -156,10 +143,6 @@ impl ReconcilerConfig {
self.secondary_download_request_timeout
.unwrap_or(SECONDARY_DOWNLOAD_REQUEST_TIMEOUT_DEFAULT)
}
pub(crate) fn tenant_creation_hint(&self) -> bool {
self.tenant_creation_hint
}
}
/// RAII resource units granted to a Reconciler, which it should keep alive until it finishes doing I/O
@@ -951,35 +934,16 @@ impl Reconciler {
)
.await;
if let Err(e) = &result {
// Set this flag so that in our ReconcileResult we will set the flag on the shard that it
// needs to retry at some point.
self.compute_notify_failure = true;
// It is up to the caller whether they want to drop out on this error, but they don't have to:
// in general we should avoid letting unavailability of the cloud control plane stop us from
// making progress.
match e {
// 404s from cplane during tenant creation are expected.
// Cplane only persists the shards to the database after
// creating the tenant and the timeline. If we notify before
// that, we'll get a 404.
//
// This is fine because tenant creations happen via /location_config
// and that returns the list of locations in the response. Hence, we
// silence the error and return Ok(()) here. Reconciliation will still
// be retried because we set [`Reconciler::compute_notify_failure`] above.
NotifyError::Unexpected(hyper::StatusCode::NOT_FOUND)
if self.reconciler_config.tenant_creation_hint() =>
{
return Ok(());
}
NotifyError::ShuttingDown => {}
_ => {
tracing::warn!(
"Failed to notify compute of attached pageserver {node}: {e}"
);
}
if !matches!(e, NotifyError::ShuttingDown) {
tracing::warn!("Failed to notify compute of attached pageserver {node}: {e}");
}
// Set this flag so that in our ReconcileResult we will set the flag on the shard that it
// needs to retry at some point.
self.compute_notify_failure = true;
}
result
} else {

View File

@@ -2238,14 +2238,9 @@ impl Service {
let waiters = {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, _scheduler) = locked.parts_mut();
let config = ReconcilerConfigBuilder::new()
.tenant_creation_hint(true)
.build();
tenants
.range_mut(TenantShardId::tenant_range(tenant_id))
.filter_map(|(_shard_id, shard)| {
self.maybe_configured_reconcile_shard(shard, nodes, config)
})
.filter_map(|(_shard_id, shard)| self.maybe_reconcile_shard(shard, nodes))
.collect::<Vec<_>>()
};

View File

@@ -707,7 +707,6 @@ impl TenantShard {
if let Some(node_id) = self.intent.get_attached() {
// Populate secondary by demoting the attached node
self.intent.demote_attached(scheduler, *node_id);
modified = true;
} else if self.intent.secondary.is_empty() {
// Populate secondary by scheduling a fresh node
@@ -980,51 +979,24 @@ impl TenantShard {
),
)
})
.collect::<HashMap<_, _>>();
.collect::<Vec<_>>();
if secondary_scores.iter().any(|score| score.1.is_none()) {
// Trivial case: if we only have one secondary, drop that one
if self.intent.get_secondary().len() == 1 {
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(
*self.intent.get_secondary().first().unwrap(),
),
});
}
// Try to find a "good" secondary to keep, without relying on scores (one or more nodes is in a state
// where its score can't be calculated), and drop the others. This enables us to make progress in
// most cases, even if some nodes are offline or have scheduling=pause set.
debug_assert!(self.intent.attached.is_some()); // We should not make it here unless attached -- this
// logic presumes we are in a mode where we want secondaries to be in non-home AZ
if let Some(retain_secondary) = self.intent.get_secondary().iter().find(|n| {
let in_home_az = scheduler.get_node_az(n) == self.intent.preferred_az_id;
let is_available = secondary_scores
.get(n)
.expect("Built from same list of nodes")
.is_some();
is_available && !in_home_az
}) {
// Great, we found one to retain. Pick some other to drop.
if let Some(victim) = self
.intent
.get_secondary()
.iter()
.find(|n| n != &retain_secondary)
{
// Don't have full list of scores, so can't make a good decision about which to drop unless
// there is an obvious one in the wrong AZ
for secondary in self.intent.get_secondary() {
if scheduler.get_node_az(secondary) == self.intent.preferred_az_id {
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(*victim),
action: ScheduleOptimizationAction::RemoveSecondary(*secondary),
});
}
}
// Fall through: we didn't identify one to remove. This ought to be rare.
tracing::warn!("Keeping extra secondaries: can't determine which of {:?} to remove (some nodes offline?)",
self.intent.get_secondary()
);
self.intent.get_secondary()
);
} else {
let victim = secondary_scores
.iter()
@@ -1033,7 +1005,7 @@ impl TenantShard {
.0;
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(*victim),
action: ScheduleOptimizationAction::RemoveSecondary(victim),
});
}
}
@@ -2407,110 +2379,6 @@ pub(crate) mod tests {
Ok(())
}
/// Test how the optimisation code behaves with an extra secondary
#[test]
fn optimize_removes_secondary() -> anyhow::Result<()> {
let az_a_tag = AvailabilityZone("az-a".to_string());
let az_b_tag = AvailabilityZone("az-b".to_string());
let mut nodes = make_test_nodes(
4,
&[
az_a_tag.clone(),
az_b_tag.clone(),
az_a_tag.clone(),
az_b_tag.clone(),
],
);
let mut scheduler = Scheduler::new(nodes.values());
let mut schedule_context = ScheduleContext::default();
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
shard_a.intent.preferred_az_id = Some(az_a_tag.clone());
shard_a
.schedule(&mut scheduler, &mut schedule_context)
.unwrap();
// Attached on node 1, secondary on node 2
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(2)]);
// Initially optimiser is idle
assert_eq!(
shard_a.optimize_attachment(&mut scheduler, &schedule_context),
None
);
assert_eq!(
shard_a.optimize_secondary(&mut scheduler, &schedule_context),
None
);
// A spare secondary in the home AZ: it should be removed -- this is the situation when we're midway through a graceful migration, after cutting over
// to our new location
shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
})
);
shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
// A spare secondary in the non-home AZ, and one of them is offline
shard_a.intent.push_secondary(&mut scheduler, NodeId(4));
nodes
.get_mut(&NodeId(4))
.unwrap()
.set_availability(NodeAvailability::Offline);
scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(4))
})
);
shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
// A spare secondary when should have none
shard_a.policy = PlacementPolicy::Attached(0);
let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
})
);
shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
assert_eq!(shard_a.intent.get_secondary(), &vec![]);
// Check that in secondary mode, we preserve the secondary in the preferred AZ
let mut schedule_context = ScheduleContext::default(); // Fresh context, we're about to call schedule()
shard_a.policy = PlacementPolicy::Secondary;
shard_a
.schedule(&mut scheduler, &mut schedule_context)
.unwrap();
assert_eq!(shard_a.intent.get_attached(), &None);
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
assert_eq!(
shard_a.optimize_attachment(&mut scheduler, &schedule_context),
None
);
assert_eq!(
shard_a.optimize_secondary(&mut scheduler, &schedule_context),
None
);
shard_a.intent.clear(&mut scheduler);
Ok(())
}
// Optimize til quiescent: this emulates what Service::optimize_all does, when
// called repeatedly in the background.
// Returns the applied optimizations

View File

@@ -2105,7 +2105,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
log.info(f"reconcile_all waited for {n} shards")
return n
def reconcile_until_idle(self, timeout_secs=30, max_interval=5):
def reconcile_until_idle(self, timeout_secs=30, max_interval=1):
start_at = time.time()
n = 1
delay_sec = 0.1
@@ -2766,11 +2766,6 @@ class NeonPageserver(PgProtocol, LogUtils):
log.error(f"Failed to decode LocationConf, raw content ({len(bytes)} bytes): {bytes}")
raise
def heatmap_content(self, tenant_shard_id: TenantId | TenantShardId) -> Any:
path = self.tenant_dir(tenant_shard_id) / "heatmap-v1.json"
with open(path) as f:
return json.load(f)
def tenant_create(
self,
tenant_id: TenantId,

View File

@@ -34,20 +34,16 @@ def test_layer_map(neon_env_builder: NeonEnvBuilder, zenbenchmark):
cur.execute("set log_statement = 'all'")
cur.execute("create table t(x integer)")
for _ in range(n_iters):
with zenbenchmark.record_duration(f"insert into t values (generate_series(1,{n_records}))"):
cur.execute(f"insert into t values (generate_series(1,{n_records}))")
cur.execute(f"insert into t values (generate_series(1,{n_records}))")
time.sleep(1)
with zenbenchmark.record_duration("vacuum t"):
cur.execute("vacuum t")
cur.execute("vacuum t")
with zenbenchmark.record_duration("SELECT count(*) from t"):
with zenbenchmark.record_duration("test_query"):
cur.execute("SELECT count(*) from t")
assert cur.fetchone() == (n_iters * n_records,)
with zenbenchmark.record_duration("flush_ep_to_pageserver"):
flush_ep_to_pageserver(env, endpoint, tenant, timeline)
with zenbenchmark.record_duration("timeline_checkpoint"):
env.pageserver.http_client().timeline_checkpoint(
tenant, timeline, compact=False, wait_until_uploaded=True
)
flush_ep_to_pageserver(env, endpoint, tenant, timeline)
env.pageserver.http_client().timeline_checkpoint(
tenant, timeline, compact=False, wait_until_uploaded=True
)

View File

@@ -29,21 +29,6 @@ AGGRESSIVE_COMPACTION_TENANT_CONF = {
# "lsn_lease_length": "0s", -- TODO: would cause branch creation errors, should fix later
}
PREEMPT_COMPACTION_TENANT_CONF = {
"gc_period": "5s",
"compaction_period": "5s",
# Small checkpoint distance to create many layers
"checkpoint_distance": 1024**2,
# Compact small layers
"compaction_target_size": 1024**2,
"image_creation_threshold": 1,
"image_creation_preempt_threshold": 1,
# compact more frequently
"compaction_threshold": 3,
"compaction_upper_limit": 6,
"lsn_lease_length": "0s",
}
@skip_in_debug_build("only run with release build")
@pytest.mark.parametrize(
@@ -51,8 +36,7 @@ PREEMPT_COMPACTION_TENANT_CONF = {
[PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED],
)
def test_pageserver_compaction_smoke(
neon_env_builder: NeonEnvBuilder,
wal_receiver_protocol: PageserverWalReceiverProtocol,
neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: PageserverWalReceiverProtocol
):
"""
This is a smoke test that compaction kicks in. The workload repeatedly churns
@@ -70,8 +54,7 @@ def test_pageserver_compaction_smoke(
page_cache_size=10
"""
conf = AGGRESSIVE_COMPACTION_TENANT_CONF.copy()
env = neon_env_builder.init_start(initial_tenant_conf=conf)
env = neon_env_builder.init_start(initial_tenant_conf=AGGRESSIVE_COMPACTION_TENANT_CONF)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
@@ -130,41 +113,6 @@ page_cache_size=10
assert vectored_average < 8
@skip_in_debug_build("only run with release build")
def test_pageserver_compaction_preempt(
neon_env_builder: NeonEnvBuilder,
):
# Ideally we should be able to do unit tests for this, but we need real Postgres
# WALs in order to do unit testing...
conf = PREEMPT_COMPACTION_TENANT_CONF.copy()
env = neon_env_builder.init_start(initial_tenant_conf=conf)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
row_count = 200000
churn_rounds = 10
ps_http = env.pageserver.http_client()
workload = Workload(env, tenant_id, timeline_id)
workload.init(env.pageserver.id)
log.info("Writing initial data ...")
workload.write_rows(row_count, env.pageserver.id)
for i in range(1, churn_rounds + 1):
log.info(f"Running churn round {i}/{churn_rounds} ...")
workload.churn_rows(row_count, env.pageserver.id, upload=False)
workload.validate(env.pageserver.id)
ps_http.timeline_compact(tenant_id, timeline_id, wait_until_uploaded=True)
log.info("Validating at workload end ...")
workload.validate(env.pageserver.id)
# ensure image layer creation gets preempted and then resumed
env.pageserver.assert_log_contains("resuming image layer creation")
@skip_in_debug_build("only run with release build")
@pytest.mark.parametrize(
"with_branches",
@@ -302,9 +250,6 @@ def test_pageserver_gc_compaction_idempotent(
workload.churn_rows(row_count, env.pageserver.id)
# compact 3 times if mode is before_restart
n_compactions = 3 if compaction_mode == "before_restart" else 1
ps_http.timeline_compact(
tenant_id, timeline_id, force_l0_compaction=True, wait_until_uploaded=True
)
for _ in range(n_compactions):
# Force refresh gc info to have gc_cutoff generated
ps_http.timeline_gc(tenant_id, timeline_id, None)

View File

@@ -95,8 +95,6 @@ def test_remote_extensions(
# mock remote_extensions spec
spec: dict[str, Any] = {
"public_extensions": ["anon"],
"custom_extensions": None,
"library_index": {
"anon": "anon",
},

View File

@@ -443,7 +443,7 @@ def test_heatmap_uploads(neon_env_builder: NeonEnvBuilder):
workload.write_rows(256, env.pageservers[0].id)
env.pageserver.http_client().tenant_heatmap_upload(tenant_id)
def validate_heatmap(heatmap, on_disk_heatmap):
def validate_heatmap(heatmap):
assert len(heatmap["timelines"]) == 1
assert heatmap["timelines"][0]["timeline_id"] == str(timeline_id)
assert len(heatmap["timelines"][0]["layers"]) > 0
@@ -452,13 +452,10 @@ def test_heatmap_uploads(neon_env_builder: NeonEnvBuilder):
# Each layer appears at most once
assert len(set(layer["name"] for layer in layers)) == len(layers)
assert heatmap == on_disk_heatmap
# Download and inspect the heatmap that the pageserver uploaded
heatmap_first = env.pageserver_remote_storage.heatmap_content(tenant_id)
heatmap_first_on_disk = env.pageserver.heatmap_content(tenant_id)
log.info(f"Read back heatmap: {heatmap_first}")
validate_heatmap(heatmap_first, heatmap_first_on_disk)
validate_heatmap(heatmap_first)
# Do some more I/O to generate more layers
workload.churn_rows(64, env.pageservers[0].id)
@@ -466,10 +463,9 @@ def test_heatmap_uploads(neon_env_builder: NeonEnvBuilder):
# Ensure that another heatmap upload includes the new layers
heatmap_second = env.pageserver_remote_storage.heatmap_content(tenant_id)
heatmap_second_on_disk = env.pageserver.heatmap_content(tenant_id)
log.info(f"Read back heatmap: {heatmap_second}")
assert heatmap_second != heatmap_first
validate_heatmap(heatmap_second, heatmap_second_on_disk)
validate_heatmap(heatmap_second)
def list_elegible_layers(

View File

@@ -92,7 +92,6 @@ tonic = { version = "0.12", default-features = false, features = ["codegen", "pr
tower = { version = "0.4", default-features = false, features = ["balance", "buffer", "limit", "util"] }
tracing = { version = "0.1", features = ["log"] }
tracing-core = { version = "0.1" }
tracing-log = { version = "0.2" }
url = { version = "2", features = ["serde"] }
zerocopy = { version = "0.7", features = ["derive", "simd"] }
zeroize = { version = "1", features = ["derive", "serde"] }