Compare commits

..

3 Commits

Author SHA1 Message Date
Folke Behrens
ef737e7d7c proxy: add benchmark for custom json logging vs official fmt logger 2025-07-15 19:44:41 +02:00
Arpad Müller
5c934efb29 Don't depend on the postgres_ffi just for one type (#12610)
We don't want to depend on postgres_ffi in an API crate. If there is no
such dependency, we can compile stuff like `storcon_cli` without needing
a full working postgres build. Fixes regression of #12548 (before we
could compile it).
2025-07-15 17:28:08 +00:00
Heikki Linnakangas
5c9c3b3317 Misc cosmetic cleanups (#12598)
- Remove a few obsolete "allowed error messages" from tests. The
pageserver doesn't emit those messages anymore.

- Remove misplaced and outdated docstring comment from
`test_tenants.py`. A docstring is supposed to be the first thing in a
function, but we had added some code before it. And it was outdated, as
we haven't supported running without safekeepers for a long time.

- Fix misc typos in comments

- Remove obsolete comment about backwards compatibility with safekeepers
without `TIMELINE_STATUS` API. All safekeepers have it by now.
2025-07-15 14:36:28 +00:00
28 changed files with 175 additions and 37 deletions

4
Cargo.lock generated
View File

@@ -5303,6 +5303,7 @@ dependencies = [
"clashmap",
"compute_api",
"consumption_metrics",
"criterion",
"ecdsa 0.16.9",
"ed25519-dalek",
"env_logger",
@@ -6211,6 +6212,7 @@ dependencies = [
"postgres-protocol",
"postgres_backend",
"postgres_ffi",
"postgres_ffi_types",
"postgres_versioninfo",
"pprof",
"pq_proto",
@@ -6255,7 +6257,7 @@ dependencies = [
"anyhow",
"const_format",
"pageserver_api",
"postgres_ffi",
"postgres_ffi_types",
"postgres_versioninfo",
"pq_proto",
"serde",

View File

@@ -1286,9 +1286,7 @@ impl ComputeNode {
// In case of error, log and fail the check, but don't crash.
// We're playing it safe because these errors could be transient
// and we don't yet retry. Also being careful here allows us to
// be backwards compatible with safekeepers that don't have the
// TIMELINE_STATUS API yet.
// and we don't yet retry.
if responses.len() < quorum {
error!(
"failed sync safekeepers check {:?} {:?} {:?}",

View File

@@ -464,7 +464,7 @@ impl Endpoint {
conf.append("max_connections", "100");
conf.append("wal_level", "logical");
// wal_sender_timeout is the maximum time to wait for WAL replication.
// It also defines how often the walreciever will send a feedback message to the wal sender.
// It also defines how often the walreceiver will send a feedback message to the wal sender.
conf.append("wal_sender_timeout", "5s");
conf.append("listen_addresses", &self.pg_address.ip().to_string());
conf.append("port", &self.pg_address.port().to_string());

View File

@@ -75,7 +75,7 @@ CLI examples:
* AWS S3 : `env AWS_ACCESS_KEY_ID='SOMEKEYAAAAASADSAH*#' AWS_SECRET_ACCESS_KEY='SOMEsEcReTsd292v' ${PAGESERVER_BIN} -c "remote_storage={bucket_name='some-sample-bucket',bucket_region='eu-north-1', prefix_in_bucket='/test_prefix/'}"`
For Amazon AWS S3, a key id and secret access key could be located in `~/.aws/credentials` if awscli was ever configured to work with the desired bucket, on the AWS Settings page for a certain user. Also note, that the bucket names does not contain any protocols when used on AWS.
For local S3 installations, refer to the their documentation for name format and credentials.
For local S3 installations, refer to their documentation for name format and credentials.
Similar to other pageserver settings, toml config file can be used to configure either of the storages as backup targets.
Required sections are:

View File

@@ -110,7 +110,6 @@ fn main() -> anyhow::Result<()> {
.allowlist_type("XLogRecPtr")
.allowlist_type("XLogSegNo")
.allowlist_type("TimeLineID")
.allowlist_type("TimestampTz")
.allowlist_type("MultiXactId")
.allowlist_type("MultiXactOffset")
.allowlist_type("MultiXactStatus")

View File

@@ -227,8 +227,7 @@ pub mod walrecord;
// Export some widely used datatypes that are unlikely to change across Postgres versions
pub use v14::bindings::{
BlockNumber, CheckPoint, ControlFileData, MultiXactId, OffsetNumber, Oid, PageHeaderData,
RepOriginId, TimeLineID, TimestampTz, TransactionId, XLogRecPtr, XLogRecord, XLogSegNo, uint32,
uint64,
RepOriginId, TimeLineID, TransactionId, XLogRecPtr, XLogRecord, XLogSegNo, uint32, uint64,
};
// Likewise for these, although the assumption that these don't change is a little more iffy.
pub use v14::bindings::{MultiXactOffset, MultiXactStatus};

View File

@@ -4,13 +4,14 @@
//! TODO: Generate separate types for each supported PG version
use bytes::{Buf, Bytes};
use postgres_ffi_types::TimestampTz;
use serde::{Deserialize, Serialize};
use utils::bin_ser::DeserializeError;
use utils::lsn::Lsn;
use crate::{
BLCKSZ, BlockNumber, MultiXactId, MultiXactOffset, MultiXactStatus, Oid, PgMajorVersion,
RepOriginId, TimestampTz, TransactionId, XLOG_SIZE_OF_XLOG_RECORD, XLogRecord, pg_constants,
RepOriginId, TransactionId, XLOG_SIZE_OF_XLOG_RECORD, XLogRecord, pg_constants,
};
#[repr(C)]
@@ -863,7 +864,8 @@ pub mod v17 {
XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapMultiInsert, XlHeapUpdate, XlParameterChange,
rm_neon,
};
pub use crate::{TimeLineID, TimestampTz};
pub use crate::TimeLineID;
pub use postgres_ffi_types::TimestampTz;
#[repr(C)]
#[derive(Debug)]

View File

@@ -9,10 +9,11 @@
use super::super::waldecoder::WalStreamDecoder;
use super::bindings::{
CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID, TimestampTz,
CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID,
XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC,
MY_PGVERSION
};
use postgres_ffi_types::TimestampTz;
use super::wal_generator::LogicalMessageGenerator;
use crate::pg_constants;
use crate::PG_TLI;

View File

@@ -11,3 +11,4 @@ pub mod forknum;
pub type Oid = u32;
pub type RepOriginId = u16;
pub type TimestampTz = i64;

View File

@@ -9,7 +9,7 @@ anyhow.workspace = true
const_format.workspace = true
serde.workspace = true
serde_json.workspace = true
postgres_ffi.workspace = true
postgres_ffi_types.workspace = true
postgres_versioninfo.workspace = true
pq_proto.workspace = true
tokio.workspace = true

View File

@@ -3,7 +3,7 @@
use std::net::SocketAddr;
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::TimestampTz;
use postgres_ffi_types::TimestampTz;
use postgres_versioninfo::PgVersionId;
use serde::{Deserialize, Serialize};
use tokio::time::Instant;

View File

@@ -2,7 +2,8 @@
use bytes::Bytes;
use postgres_ffi::walrecord::{MultiXactMember, describe_postgres_wal_record};
use postgres_ffi::{MultiXactId, MultiXactOffset, TimestampTz, TransactionId};
use postgres_ffi::{MultiXactId, MultiXactOffset, TransactionId};
use postgres_ffi_types::TimestampTz;
use serde::{Deserialize, Serialize};
use utils::bin_ser::DeserializeError;

View File

@@ -1,5 +1,5 @@
//! The validator is responsible for validating DeletionLists for execution,
//! based on whethe the generation in the DeletionList is still the latest
//! based on whether the generation in the DeletionList is still the latest
//! generation for a tenant.
//!
//! The purpose of validation is to ensure split-brain safety in the cluster

View File

@@ -25,9 +25,9 @@ use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
use pageserver_api::models::RelSizeMigration;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::{BLCKSZ, PgMajorVersion, TimestampTz, TransactionId};
use postgres_ffi::{BLCKSZ, PgMajorVersion, TransactionId};
use postgres_ffi_types::forknum::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi_types::{Oid, RepOriginId};
use postgres_ffi_types::{Oid, RepOriginId, TimestampTz};
use serde::{Deserialize, Serialize};
use strum::IntoEnumIterator;
use tokio_util::sync::CancellationToken;

View File

@@ -1,6 +1,6 @@
//! An utilization metric which is used to decide on which pageserver to put next tenant.
//!
//! The metric is exposed via `GET /v1/utilization`. Refer and maintain it's openapi spec as the
//! The metric is exposed via `GET /v1/utilization`. Refer and maintain its openapi spec as the
//! truth.
use std::path::Path;

View File

@@ -32,9 +32,10 @@ use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::walrecord::*;
use postgres_ffi::{
PgMajorVersion, TimestampTz, TransactionId, dispatch_pgversion, enum_pgversion,
enum_pgversion_dispatch, fsm_logical_to_physical, pg_constants,
PgMajorVersion, TransactionId, dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch,
fsm_logical_to_physical, pg_constants,
};
use postgres_ffi_types::TimestampTz;
use postgres_ffi_types::forknum::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use tracing::*;
use utils::bin_ser::{DeserializeError, SerializeError};
@@ -1069,7 +1070,7 @@ impl WalIngest {
// NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
// go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
// read it, like GetNewMultiXactId(). This is different from how nextXid is
// incremented! nextXid skips over < FirstNormalTransactionId when the the value
// incremented! nextXid skips over < FirstNormalTransactionId when the value
// is stored, so it's never 0 in a checkpoint.
//
// I don't know why it's done that way, it seems less error-prone to skip over 0

View File

@@ -120,6 +120,7 @@ workspace_hack.workspace = true
[dev-dependencies]
assert-json-diff.workspace = true
camino-tempfile.workspace = true
criterion.workspace = true
fallible-iterator.workspace = true
flate2.workspace = true
tokio-tungstenite.workspace = true
@@ -130,3 +131,8 @@ walkdir.workspace = true
rand_distr = "0.4"
tokio-postgres.workspace = true
tracing-test = "0.2"
[[bench]]
name = "logging"
harness = false

127
proxy/benches/logging.rs Normal file
View File

@@ -0,0 +1,127 @@
use std::io;
use criterion::{Criterion, criterion_group, criterion_main};
use proxy::logging::{Clock, JsonLoggingLayer};
use tracing_subscriber::prelude::*;
struct DevNullWriter;
impl proxy::logging::MakeWriter for DevNullWriter {
fn make_writer(&self) -> impl io::Write {
DevNullWriter
}
}
impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for DevNullWriter {
type Writer = DevNullWriter;
fn make_writer(&'a self) -> Self::Writer {
DevNullWriter
}
}
impl io::Write for DevNullWriter {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
Ok(criterion::black_box(buf).len())
}
#[inline(always)]
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
struct FixedClock;
impl Clock for FixedClock {
fn now(&self) -> chrono::DateTime<chrono::Utc> {
const { chrono::DateTime::from_timestamp_nanos(1747859990_000_000_000).to_utc() }
}
}
pub fn bench_logging(c: &mut Criterion) {
c.bench_function("text fmt current", |b| {
let registry = tracing_subscriber::Registry::default().with(
tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_target(false)
.with_writer(DevNullWriter),
);
tracing::subscriber::with_default(registry, || {
tracing::info_span!("span1", a = 42, b = true, c = "string").in_scope(|| {
tracing::info_span!("span2", a = 42, b = true, c = "string").in_scope(|| {
b.iter(|| {
tracing::error!(a = 42, b = true, c = "string", "message field");
})
});
});
});
});
c.bench_function("text fmt full", |b| {
let registry = tracing_subscriber::Registry::default().with(
tracing_subscriber::fmt::layer()
.with_level(true)
.with_file(true)
.with_line_number(true)
.with_target(true)
.with_thread_ids(true)
.with_writer(DevNullWriter),
);
tracing::subscriber::with_default(registry, || {
tracing::info_span!("span1", a = 42, b = true, c = "string").in_scope(|| {
tracing::info_span!("span2", a = 42, b = true, c = "string").in_scope(|| {
b.iter(|| {
tracing::error!(a = 42, b = true, c = "string", "message field");
})
});
});
});
});
c.bench_function("json fmt", |b| {
let registry = tracing_subscriber::Registry::default().with(
tracing_subscriber::fmt::layer()
.with_level(true)
.with_file(true)
.with_line_number(true)
.with_target(true)
.with_thread_ids(true)
.with_writer(DevNullWriter)
.json(),
);
tracing::subscriber::with_default(registry, || {
tracing::info_span!("span1", a = 42, b = true, c = "string").in_scope(|| {
tracing::info_span!("span2", a = 42, b = true, c = "string").in_scope(|| {
b.iter(|| {
tracing::error!(a = 42, b = true, c = "string", "message field");
})
});
});
});
});
c.bench_function("json custom", |b| {
let registry = tracing_subscriber::Registry::default().with(JsonLoggingLayer::new(
FixedClock,
DevNullWriter,
&["a"],
));
tracing::subscriber::with_default(registry, || {
tracing::info_span!("span1", a = 42, b = true, c = "string").in_scope(|| {
tracing::info_span!("span2", a = 42, b = true, c = "string").in_scope(|| {
b.iter(|| {
tracing::error!(a = 42, b = true, c = "string", "message field");
})
});
});
});
});
}
criterion_group!(benches, bench_logging);
criterion_main!(benches);

View File

@@ -93,7 +93,7 @@ mod ext;
mod http;
mod intern;
mod jemalloc;
mod logging;
pub mod logging;
mod metrics;
mod parse;
mod pglb;

View File

@@ -148,11 +148,11 @@ impl LogFormat {
}
}
trait MakeWriter {
pub trait MakeWriter {
fn make_writer(&self) -> impl io::Write;
}
struct StderrWriter {
pub struct StderrWriter {
stderr: io::Stderr,
}
@@ -164,11 +164,11 @@ impl MakeWriter for StderrWriter {
}
// TODO: move into separate module or even separate crate.
trait Clock {
pub trait Clock {
fn now(&self) -> DateTime<Utc>;
}
struct RealClock;
pub struct RealClock;
impl Clock for RealClock {
#[inline]
@@ -203,7 +203,7 @@ type CallsiteMap<T> =
papaya::HashMap<callsite::Identifier, T, std::hash::BuildHasherDefault<rustc_hash::FxHasher>>;
/// Implements tracing layer to handle events specific to logging.
struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
pub struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
clock: C,
writer: W,
@@ -217,7 +217,7 @@ struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
}
impl<C: Clock, W: MakeWriter> JsonLoggingLayer<C, W> {
fn new(clock: C, writer: W, extract_fields: &'static [&'static str]) -> Self {
pub fn new(clock: C, writer: W, extract_fields: &'static [&'static str]) -> Self {
JsonLoggingLayer {
clock,
skipped_field_indices: CallsiteMap::default(),

View File

@@ -110,7 +110,7 @@ where
debug!(error = ?err, COULD_NOT_CONNECT);
let node_info = if !node_info.cached() || !err.should_retry_wake_compute() {
// If we just recieved this from cplane and didn't get it from cache, we shouldn't retry.
// If we just received this from cplane and not from the cache, we shouldn't retry.
// Do not need to retrieve a new node_info, just return the old one.
if !should_retry(&err, num_retries, compute.retry) {
Metrics::get().proxy.retries_metric.observe(

View File

@@ -58,6 +58,7 @@ metrics.workspace = true
pem.workspace = true
postgres_backend.workspace = true
postgres_ffi.workspace = true
postgres_ffi_types.workspace = true
postgres_versioninfo.workspace = true
pq_proto.workspace = true
remote_storage.workspace = true

View File

@@ -12,7 +12,8 @@ use futures::FutureExt;
use itertools::Itertools;
use parking_lot::Mutex;
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend, PostgresBackendReader, QueryError};
use postgres_ffi::{MAX_SEND_SIZE, PgMajorVersion, TimestampTz, get_current_timestamp};
use postgres_ffi::{MAX_SEND_SIZE, PgMajorVersion, get_current_timestamp};
use postgres_ffi_types::TimestampTz;
use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
use safekeeper_api::Term;
use safekeeper_api::models::{

View File

@@ -728,7 +728,7 @@ class NeonEnvBuilder:
# NB: neon_local rewrites postgresql.conf on each start based on neon_local config. No need to patch it.
# However, in this new NeonEnv, the pageservers and safekeepers listen on different ports, and the storage
# controller will currently reject re-attach requests from them because the NodeMetadata isn't identical.
# So, from_repo_dir patches up the the storcon database.
# So, from_repo_dir patches up the storcon database.
patch_script_path = self.repo_dir / "storage_controller_db.startup.sql"
assert not patch_script_path.exists()
patch_script = ""

View File

@@ -24,10 +24,7 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder):
[
".*get_values_reconstruct_data for layer .*",
".*could not find data for key.*",
".*is not active. Current state: Broken.*",
".*will not become active. Current state: Broken.*",
".*failed to load metadata.*",
".*load failed.*load local timeline.*",
".*: layer load failed, assuming permanent failure:.*",
".*failed to get checkpoint bytes.*",
".*failed to get control bytes.*",

View File

@@ -687,7 +687,7 @@ def test_sharding_compaction(
for _i in range(0, 10):
# Each of these does some writes then a checkpoint: because we set image_creation_threshold to 1,
# these should result in image layers each time we write some data into a shard, and also shards
# recieving less data hitting their "empty image layer" path (wherre they should skip writing the layer,
# receiving less data hitting their "empty image layer" path (where they should skip writing the layer,
# rather than asserting)
workload.churn_rows(64)

View File

@@ -20,6 +20,9 @@ from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from fixtures.utils import query_scalar, wait_until
@pytest.mark.skip(
reason="We won't create future layers any more after https://github.com/neondatabase/neon/pull/10548"
)
@pytest.mark.parametrize(
"attach_mode",
["default_generation", "same_generation"],

View File

@@ -76,7 +76,6 @@ def test_tenants_normal_work(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
"""Tests tenants with and without wal acceptors"""
tenant_1, _ = env.create_tenant()
tenant_2, _ = env.create_tenant()