Compare commits

..

3 Commits

Author SHA1 Message Date
Folke Behrens
ef737e7d7c proxy: add benchmark for custom json logging vs official fmt logger 2025-07-15 19:44:41 +02:00
Arpad Müller
5c934efb29 Don't depend on the postgres_ffi just for one type (#12610)
We don't want to depend on postgres_ffi in an API crate. If there is no
such dependency, we can compile stuff like `storcon_cli` without needing
a full working postgres build. Fixes regression of #12548 (before we
could compile it).
2025-07-15 17:28:08 +00:00
Heikki Linnakangas
5c9c3b3317 Misc cosmetic cleanups (#12598)
- Remove a few obsolete "allowed error messages" from tests. The
pageserver doesn't emit those messages anymore.

- Remove misplaced and outdated docstring comment from
`test_tenants.py`. A docstring is supposed to be the first thing in a
function, but we had added some code before it. And it was outdated, as
we haven't supported running without safekeepers for a long time.

- Fix misc typos in comments

- Remove obsolete comment about backwards compatibility with safekeepers
without `TIMELINE_STATUS` API. All safekeepers have it by now.
2025-07-15 14:36:28 +00:00
42 changed files with 397 additions and 287 deletions

4
Cargo.lock generated
View File

@@ -5303,6 +5303,7 @@ dependencies = [
"clashmap",
"compute_api",
"consumption_metrics",
"criterion",
"ecdsa 0.16.9",
"ed25519-dalek",
"env_logger",
@@ -6211,6 +6212,7 @@ dependencies = [
"postgres-protocol",
"postgres_backend",
"postgres_ffi",
"postgres_ffi_types",
"postgres_versioninfo",
"pprof",
"pq_proto",
@@ -6255,7 +6257,7 @@ dependencies = [
"anyhow",
"const_format",
"pageserver_api",
"postgres_ffi",
"postgres_ffi_types",
"postgres_versioninfo",
"pq_proto",
"serde",

View File

@@ -1286,9 +1286,7 @@ impl ComputeNode {
// In case of error, log and fail the check, but don't crash.
// We're playing it safe because these errors could be transient
// and we don't yet retry. Also being careful here allows us to
// be backwards compatible with safekeepers that don't have the
// TIMELINE_STATUS API yet.
// and we don't yet retry.
if responses.len() < quorum {
error!(
"failed sync safekeepers check {:?} {:?} {:?}",

View File

@@ -464,7 +464,7 @@ impl Endpoint {
conf.append("max_connections", "100");
conf.append("wal_level", "logical");
// wal_sender_timeout is the maximum time to wait for WAL replication.
// It also defines how often the walreciever will send a feedback message to the wal sender.
// It also defines how often the walreceiver will send a feedback message to the wal sender.
conf.append("wal_sender_timeout", "5s");
conf.append("listen_addresses", &self.pg_address.ip().to_string());
conf.append("port", &self.pg_address.port().to_string());

View File

@@ -75,7 +75,7 @@ CLI examples:
* AWS S3 : `env AWS_ACCESS_KEY_ID='SOMEKEYAAAAASADSAH*#' AWS_SECRET_ACCESS_KEY='SOMEsEcReTsd292v' ${PAGESERVER_BIN} -c "remote_storage={bucket_name='some-sample-bucket',bucket_region='eu-north-1', prefix_in_bucket='/test_prefix/'}"`
For Amazon AWS S3, a key id and secret access key could be located in `~/.aws/credentials` if awscli was ever configured to work with the desired bucket, on the AWS Settings page for a certain user. Also note, that the bucket names does not contain any protocols when used on AWS.
For local S3 installations, refer to the their documentation for name format and credentials.
For local S3 installations, refer to their documentation for name format and credentials.
Similar to other pageserver settings, toml config file can be used to configure either of the storages as backup targets.
Required sections are:

View File

@@ -110,7 +110,6 @@ fn main() -> anyhow::Result<()> {
.allowlist_type("XLogRecPtr")
.allowlist_type("XLogSegNo")
.allowlist_type("TimeLineID")
.allowlist_type("TimestampTz")
.allowlist_type("MultiXactId")
.allowlist_type("MultiXactOffset")
.allowlist_type("MultiXactStatus")

View File

@@ -227,8 +227,7 @@ pub mod walrecord;
// Export some widely used datatypes that are unlikely to change across Postgres versions
pub use v14::bindings::{
BlockNumber, CheckPoint, ControlFileData, MultiXactId, OffsetNumber, Oid, PageHeaderData,
RepOriginId, TimeLineID, TimestampTz, TransactionId, XLogRecPtr, XLogRecord, XLogSegNo, uint32,
uint64,
RepOriginId, TimeLineID, TransactionId, XLogRecPtr, XLogRecord, XLogSegNo, uint32, uint64,
};
// Likewise for these, although the assumption that these don't change is a little more iffy.
pub use v14::bindings::{MultiXactOffset, MultiXactStatus};

View File

@@ -4,13 +4,14 @@
//! TODO: Generate separate types for each supported PG version
use bytes::{Buf, Bytes};
use postgres_ffi_types::TimestampTz;
use serde::{Deserialize, Serialize};
use utils::bin_ser::DeserializeError;
use utils::lsn::Lsn;
use crate::{
BLCKSZ, BlockNumber, MultiXactId, MultiXactOffset, MultiXactStatus, Oid, PgMajorVersion,
RepOriginId, TimestampTz, TransactionId, XLOG_SIZE_OF_XLOG_RECORD, XLogRecord, pg_constants,
RepOriginId, TransactionId, XLOG_SIZE_OF_XLOG_RECORD, XLogRecord, pg_constants,
};
#[repr(C)]
@@ -863,7 +864,8 @@ pub mod v17 {
XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapMultiInsert, XlHeapUpdate, XlParameterChange,
rm_neon,
};
pub use crate::{TimeLineID, TimestampTz};
pub use crate::TimeLineID;
pub use postgres_ffi_types::TimestampTz;
#[repr(C)]
#[derive(Debug)]

View File

@@ -9,10 +9,11 @@
use super::super::waldecoder::WalStreamDecoder;
use super::bindings::{
CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID, TimestampTz,
CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID,
XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC,
MY_PGVERSION
};
use postgres_ffi_types::TimestampTz;
use super::wal_generator::LogicalMessageGenerator;
use crate::pg_constants;
use crate::PG_TLI;

View File

@@ -11,3 +11,4 @@ pub mod forknum;
pub type Oid = u32;
pub type RepOriginId = u16;
pub type TimestampTz = i64;

View File

@@ -9,7 +9,7 @@ anyhow.workspace = true
const_format.workspace = true
serde.workspace = true
serde_json.workspace = true
postgres_ffi.workspace = true
postgres_ffi_types.workspace = true
postgres_versioninfo.workspace = true
pq_proto.workspace = true
tokio.workspace = true

View File

@@ -3,7 +3,7 @@
use std::net::SocketAddr;
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::TimestampTz;
use postgres_ffi_types::TimestampTz;
use postgres_versioninfo::PgVersionId;
use serde::{Deserialize, Serialize};
use tokio::time::Instant;

View File

@@ -2,7 +2,8 @@
use bytes::Bytes;
use postgres_ffi::walrecord::{MultiXactMember, describe_postgres_wal_record};
use postgres_ffi::{MultiXactId, MultiXactOffset, TimestampTz, TransactionId};
use postgres_ffi::{MultiXactId, MultiXactOffset, TransactionId};
use postgres_ffi_types::TimestampTz;
use serde::{Deserialize, Serialize};
use utils::bin_ser::DeserializeError;

View File

@@ -1,5 +1,5 @@
//! The validator is responsible for validating DeletionLists for execution,
//! based on whethe the generation in the DeletionList is still the latest
//! based on whether the generation in the DeletionList is still the latest
//! generation for a tenant.
//!
//! The purpose of validation is to ensure split-brain safety in the cluster

View File

@@ -25,9 +25,9 @@ use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
use pageserver_api::models::RelSizeMigration;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::{BLCKSZ, PgMajorVersion, TimestampTz, TransactionId};
use postgres_ffi::{BLCKSZ, PgMajorVersion, TransactionId};
use postgres_ffi_types::forknum::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi_types::{Oid, RepOriginId};
use postgres_ffi_types::{Oid, RepOriginId, TimestampTz};
use serde::{Deserialize, Serialize};
use strum::IntoEnumIterator;
use tokio_util::sync::CancellationToken;

View File

@@ -1,6 +1,6 @@
//! An utilization metric which is used to decide on which pageserver to put next tenant.
//!
//! The metric is exposed via `GET /v1/utilization`. Refer and maintain it's openapi spec as the
//! The metric is exposed via `GET /v1/utilization`. Refer and maintain its openapi spec as the
//! truth.
use std::path::Path;

View File

@@ -32,9 +32,10 @@ use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::walrecord::*;
use postgres_ffi::{
PgMajorVersion, TimestampTz, TransactionId, dispatch_pgversion, enum_pgversion,
enum_pgversion_dispatch, fsm_logical_to_physical, pg_constants,
PgMajorVersion, TransactionId, dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch,
fsm_logical_to_physical, pg_constants,
};
use postgres_ffi_types::TimestampTz;
use postgres_ffi_types::forknum::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use tracing::*;
use utils::bin_ser::{DeserializeError, SerializeError};
@@ -1069,7 +1070,7 @@ impl WalIngest {
// NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
// go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
// read it, like GetNewMultiXactId(). This is different from how nextXid is
// incremented! nextXid skips over < FirstNormalTransactionId when the the value
// incremented! nextXid skips over < FirstNormalTransactionId when the value
// is stored, so it's never 0 in a checkpoint.
//
// I don't know why it's done that way, it seems less error-prone to skip over 0

View File

@@ -38,8 +38,6 @@ DATA = \
neon--1.3--1.4.sql \
neon--1.4--1.5.sql \
neon--1.5--1.6.sql \
neon--1.6--1.7.sql \
neon--1.7--1.6.sql \
neon--1.6--1.5.sql \
neon--1.5--1.4.sql \
neon--1.4--1.3.sql \

View File

@@ -54,7 +54,6 @@
*/
#include "postgres.h"
#include "access/twophase.h"
#include "access/xlog.h"
#include "access/xlogdefs.h"
#include "access/xlog_internal.h"
@@ -65,7 +64,6 @@
#include "miscadmin.h"
#include "port/pg_iovec.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "utils/timeout.h"
@@ -77,18 +75,11 @@
#include "neon_perf_counters.h"
#include "pagestore_client.h"
#if PG_MAJORVERSION_NUM >= 17
#include "storage/procnumber.h"
#else
#define MyProcNumber MyProc->pgprocno
#endif
#if PG_MAJORVERSION_NUM >= 15
#if PG_VERSION_NUM >= 150000
#include "access/xlogrecovery.h"
#endif
#if PG_MAJORVERSION_NUM < 16
#if PG_VERSION_NUM < 160000
typedef PGAlignedBlock PGIOAlignedBlock;
#endif
@@ -303,15 +294,6 @@ static PrefetchState *MyPState;
static process_interrupts_callback_t prev_interrupt_cb;
/*
* Array in shared memory each cell of which contains minimal in-flight request LSN sent to PS by the backend which procno is
* used as index in this array. This array is initially filled with InfiniteXlogRecPtr (UINT64_MAX) so if backend
* didn't send any request to PS, then this value doesn't effect global min.
*
* We support only 64-bit platforms and so assume that access to array elements is atomic and no any synchronization is needed.
*/
static XLogRecPtr* minPrefetchLsn;
static bool compact_prefetch_buffers(void);
static void consume_prefetch_responses(void);
static uint64 prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
@@ -334,41 +316,6 @@ pg_init_communicator(void)
ProcessInterruptsCallback = communicator_processinterrupts;
}
static Size
CommunicatorShmemSize(void)
{
#if PG_MAJORVERSION_NUM >= 15
Assert(MaxBackends != 0);
return (MaxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts) * sizeof(XLogRecPtr);
#else
return (MAX_BACKENDS + NUM_AUXILIARY_PROCS + max_prepared_xacts) * sizeof(XLogRecPtr);
#endif
}
void
CommunicatorShmemRequest(void)
{
RequestAddinShmemSpace(CommunicatorShmemSize());
}
void
CommunicatorShmemInit(void)
{
bool found;
minPrefetchLsn = (XLogRecPtr*)ShmemInitStruct("Communicator shared state",
CommunicatorShmemSize(),
&found);
if (!found)
{
/*
* Fill with InfiniteXLogRecPtr (UINT64_MAX).
* If backend didn't send any requests to PS, then InfiniteXLogRecPtr doesn't affect global minimal value.
*/
memset(minPrefetchLsn, 0xFF, CommunicatorShmemSize());
}
}
static bool
compact_prefetch_buffers(void)
{
@@ -507,20 +454,6 @@ check_getpage_response(PrefetchRequest* slot, NeonResponse* resp)
}
}
/*
* Update min in-flight prefetch LSN for this backend.
*/
static void
update_min_prefetch_lsn(uint64 ring_index)
{
if (ring_index + 1 < MyPState->ring_unused)
{
PrefetchRequest* next_slot = GetPrfSlot(ring_index + 1);
Assert(minPrefetchLsn[MyProcNumber] <= next_slot->request_lsns.request_lsn);
minPrefetchLsn[MyProcNumber] = next_slot->request_lsns.request_lsn;
}
}
/*
* If there might be responses still in the TCP buffer, then we should try to
* use those, to reduce any TCP backpressure on the OS/PS side.
@@ -545,9 +478,8 @@ communicator_prefetch_pump_state(void)
NeonResponse *response;
PrefetchRequest *slot;
MemoryContext old;
uint64 my_ring_index = MyPState->ring_receive;
slot = GetPrfSlot(my_ring_index);
slot = GetPrfSlot(MyPState->ring_receive);
old = MemoryContextSwitchTo(MyPState->errctx);
response = page_server->try_receive(slot->shard_no);
@@ -556,19 +488,17 @@ communicator_prefetch_pump_state(void)
if (response == NULL)
break;
update_min_prefetch_lsn(my_ring_index);
check_getpage_response(slot, response);
/* The slot should still be valid */
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != my_ring_index)
slot->my_ring_index != MyPState->ring_receive)
{
neon_shard_log(slot->shard_no, PANIC,
"Incorrect prefetch slot state after receive: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "",
slot->status, slot->response,
slot->my_ring_index, my_ring_index);
slot->my_ring_index, MyPState->ring_receive);
}
/* update prefetch state */
MyPState->n_responses_buffered += 1;
@@ -735,9 +665,6 @@ consume_prefetch_responses(void)
{
if (MyPState->ring_receive < MyPState->ring_unused)
prefetch_wait_for(MyPState->ring_unused - 1);
minPrefetchLsn[MyProcNumber] = InfiniteXLogRecPtr; /* No more in-flight prefetch requests from this backend */
/*
* We know for sure we're not working on any prefetch pages after
* this.
@@ -879,9 +806,6 @@ prefetch_read(PrefetchRequest *slot)
old = MemoryContextSwitchTo(MyPState->errctx);
response = (NeonResponse *) page_server->receive(shard_no);
MemoryContextSwitchTo(old);
update_min_prefetch_lsn(my_ring_index);
if (response)
{
check_getpage_response(slot, response);
@@ -1000,8 +924,6 @@ prefetch_on_ps_disconnect(void)
MyNeonCounters->getpage_prefetch_discards_total += 1;
}
minPrefetchLsn[MyProcNumber] = InfiniteXLogRecPtr; /* No more in-flight prefetch requests from this backend */
/*
* We can have gone into retry due to network error, so update stats with
* the latest available
@@ -1103,8 +1025,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_unused);
minPrefetchLsn[MyProcNumber] = Min(request.hdr.lsn, minPrefetchLsn[MyProcNumber]);
while (!page_server->send(slot->shard_no, (NeonRequest *) &request))
{
Assert(mySlotNo == MyPState->ring_unused);
@@ -1125,23 +1045,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(!found);
}
/*
* Check that returned page LSN is consistent with request lsns
*/
static void
check_page_lsn(NeonGetPageResponse* resp)
{
if (PageGetLSN(resp->page) > resp->req.hdr.not_modified_since)
neon_log(PANIC, "Invalid getpage response version: %X/%08X is higher than last modified LSN %X/%08X",
LSN_FORMAT_ARGS(PageGetLSN(resp->page)),
LSN_FORMAT_ARGS(resp->req.hdr.not_modified_since));
if (PageGetLSN(resp->page) > resp->req.hdr.lsn)
neon_log(PANIC, "Invalid getpage response version: %X/%08X is higher than request LSN %X/%08X",
LSN_FORMAT_ARGS(PageGetLSN(resp->page)),
LSN_FORMAT_ARGS(resp->req.hdr.lsn));
}
/*
* Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted.
* Present pages are marked in "mask" bitmap and total number of such pages is returned.
@@ -1165,7 +1068,7 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
for (int i = 0; i < nblocks; i++)
{
PrfHashEntry *entry;
NeonGetPageResponse* resp;
hashkey.buftag.blockNum = blocknum + i;
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
@@ -1198,9 +1101,8 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
continue;
}
Assert(slot->response->tag == T_NeonGetPageResponse); /* checked by check_getpage_response when response was assigned to the slot */
resp = (NeonGetPageResponse*)slot->response;
check_page_lsn(resp);
memcpy(buffers[i], resp->page, BLCKSZ);
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
/*
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
@@ -1551,7 +1453,6 @@ page_server_request(void const *req)
PG_TRY();
{
before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
minPrefetchLsn[MyProcNumber] = ((NeonRequest *)req)->lsn;
do
{
while (!page_server->send(shard_no, (NeonRequest *) req)
@@ -1563,12 +1464,10 @@ page_server_request(void const *req)
resp = page_server->receive(shard_no);
MyNeonCounters->pageserver_open_requests--;
} while (resp == NULL);
minPrefetchLsn[MyProcNumber] = InfiniteXLogRecPtr;
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
}
PG_CATCH();
{
minPrefetchLsn[MyProcNumber] = InfiniteXLogRecPtr;
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
HOLD_INTERRUPTS();
@@ -1989,7 +1888,7 @@ communicator_init(void)
* the check here. That's OK, we don't expect the logic to change in old
* releases.
*/
#if PG_MAJORVERSION_NUM >= 15
#if PG_VERSION_NUM>=150000
if (MyNeonCounters >= &neon_per_backend_counters_shared[NUM_NEON_PERF_COUNTER_SLOTS])
elog(ERROR, "MyNeonCounters points past end of array");
#endif
@@ -2068,7 +1967,7 @@ neon_prefetch_response_usable(neon_request_lsns *request_lsns,
* Each request to the pageserver has three LSN values associated with it:
* `not_modified_since`, `request_lsn`, and 'effective_request_lsn'.
* `not_modified_since` and `request_lsn` are sent to the pageserver, but
* in the primary node, we always use InfiniteXLogRecPtr as the `request_lsn`, so
* in the primary node, we always use UINT64_MAX as the `request_lsn`, so
* we remember `effective_request_lsn` separately. In a primary,
* `effective_request_lsn` is the same as `not_modified_since`.
* See comments in neon_get_request_lsns why we can not use last flush WAL position here.
@@ -2328,7 +2227,6 @@ Retry:
case T_NeonGetPageResponse:
{
NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp;
check_page_lsn(getpage_resp);
memcpy(buffer, getpage_resp->page, BLCKSZ);
/*
@@ -2526,18 +2424,15 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
PG_TRY();
{
before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
minPrefetchLsn[MyProcNumber] = request_lsns->request_lsn;
do
{
while (!page_server->send(shard_no, &request.hdr) || !page_server->flush(shard_no));
resp = page_server->receive(shard_no);
} while (resp == NULL);
minPrefetchLsn[MyProcNumber] = InfiniteXLogRecPtr;
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
}
PG_CATCH();
{
minPrefetchLsn[MyProcNumber] = InfiniteXLogRecPtr;
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
HOLD_INTERRUPTS();
@@ -2682,19 +2577,3 @@ communicator_processinterrupts(void)
return prev_interrupt_cb();
}
PG_FUNCTION_INFO_V1(neon_communicator_min_inflight_request_lsn);
Datum
neon_communicator_min_inflight_request_lsn(PG_FUNCTION_ARGS)
{
XLogRecPtr min_lsn = RecoveryInProgress()
? GetXLogReplayRecPtr(NULL)
: InfiniteXLogRecPtr;
size_t n_procs = ProcGlobal->allProcCount;
for (size_t i = 0; i < n_procs; i++)
{
min_lsn = Min(min_lsn, minPrefetchLsn[i]);
}
PG_RETURN_INT64(min_lsn);
}

View File

@@ -46,4 +46,5 @@ extern int communicator_read_slru_segment(SlruKind kind, int64 segno,
extern void communicator_reconfigure_timeout_if_needed(void);
extern void communicator_prefetch_pump_state(void);
#endif

View File

@@ -219,6 +219,10 @@ static char *lfc_path;
static uint64 lfc_generation;
static FileCacheControl *lfc_ctl;
static bool lfc_do_prewarm;
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
bool lfc_store_prefetch_result;
bool lfc_prewarm_update_ws_estimation;
@@ -338,14 +342,18 @@ lfc_ensure_opened(void)
return true;
}
void
LfcShmemInit(void)
static void
lfc_shmem_startup(void)
{
bool found;
static HASHCTL info;
if (lfc_max_size <= 0)
return;
if (prev_shmem_startup_hook)
{
prev_shmem_startup_hook();
}
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
lfc_ctl = (FileCacheControl *) ShmemInitStruct("lfc", sizeof(FileCacheControl), &found);
if (!found)
@@ -390,16 +398,19 @@ LfcShmemInit(void)
ConditionVariableInit(&lfc_ctl->cv[i]);
}
LWLockRelease(AddinShmemInitLock);
}
void
LfcShmemRequest(void)
static void
lfc_shmem_request(void)
{
if (lfc_max_size > 0)
{
RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, FILE_CACHE_ENRTY_SIZE));
RequestNamedLWLockTranche("lfc_lock", 1);
}
#if PG_VERSION_NUM>=150000
if (prev_shmem_request_hook)
prev_shmem_request_hook();
#endif
RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, FILE_CACHE_ENRTY_SIZE));
RequestNamedLWLockTranche("lfc_lock", 1);
}
static bool
@@ -631,6 +642,18 @@ lfc_init(void)
NULL,
NULL,
NULL);
if (lfc_max_size == 0)
return;
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = lfc_shmem_startup;
#if PG_VERSION_NUM>=150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = lfc_shmem_request;
#else
lfc_shmem_request();
#endif
}
FileCacheState*

View File

@@ -118,6 +118,10 @@ typedef struct
ShardMap shard_map;
} PagestoreShmemState;
#if PG_VERSION_NUM >= 150000
static shmem_request_hook_type prev_shmem_request_hook = NULL;
#endif
static shmem_startup_hook_type prev_shmem_startup_hook;
static PagestoreShmemState *pagestore_shared;
static uint64 pagestore_local_counter = 0;
@@ -1280,12 +1284,18 @@ check_neon_id(char **newval, void **extra, GucSource source)
return **newval == '\0' || HexDecodeString(id, *newval, 16);
}
static Size
PagestoreShmemSize(void)
{
return add_size(sizeof(PagestoreShmemState), NeonPerfCountersShmemSize());
}
void
static bool
PagestoreShmemInit(void)
{
bool found;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
pagestore_shared = ShmemInitStruct("libpagestore shared state",
sizeof(PagestoreShmemState),
&found);
@@ -1296,12 +1306,44 @@ PagestoreShmemInit(void)
memset(&pagestore_shared->shard_map, 0, sizeof(ShardMap));
AssignPageserverConnstring(page_server_connstring, NULL);
}
NeonPerfCountersShmemInit();
LWLockRelease(AddinShmemInitLock);
return found;
}
void
PagestoreShmemRequest(void)
static void
pagestore_shmem_startup_hook(void)
{
RequestAddinShmemSpace(sizeof(PagestoreShmemState));
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();
PagestoreShmemInit();
}
static void
pagestore_shmem_request(void)
{
#if PG_VERSION_NUM >= 150000
if (prev_shmem_request_hook)
prev_shmem_request_hook();
#endif
RequestAddinShmemSpace(PagestoreShmemSize());
}
static void
pagestore_prepare_shmem(void)
{
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = pagestore_shmem_request;
#else
pagestore_shmem_request();
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = pagestore_shmem_startup_hook;
}
/*
@@ -1310,6 +1352,8 @@ PagestoreShmemRequest(void)
void
pg_init_libpagestore(void)
{
pagestore_prepare_shmem();
DefineCustomStringVariable("neon.pageserver_connstring",
"connection string to the page server",
NULL,
@@ -1460,6 +1504,8 @@ pg_init_libpagestore(void)
0,
NULL, NULL, NULL);
relsize_hash_init();
if (page_server != NULL)
neon_log(ERROR, "libpagestore already loaded");

View File

@@ -1,3 +0,0 @@
create function neon_communicator_min_inflight_request_lsn() returns pg_catalog.pg_lsn
AS 'MODULE_PATHNAME', 'neon_communicator_min_inflight_request_lsn'
LANGUAGE C;

View File

@@ -1 +0,0 @@
drop function neon_communicator_min_inflight_request_lsn();

View File

@@ -22,7 +22,6 @@
#include "replication/slot.h"
#include "replication/walsender.h"
#include "storage/proc.h"
#include "storage/ipc.h"
#include "funcapi.h"
#include "access/htup_details.h"
#include "utils/builtins.h"
@@ -60,15 +59,11 @@ static ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
static void neon_ExecutorStart(QueryDesc *queryDesc, int eflags);
static void neon_ExecutorEnd(QueryDesc *queryDesc);
#if PG_MAJORVERSION_NUM >= 16
static shmem_startup_hook_type prev_shmem_startup_hook;
static void neon_shmem_startup_hook(void);
static void neon_shmem_request_hook(void);
#if PG_MAJORVERSION_NUM >= 15
static shmem_request_hook_type prev_shmem_request_hook = NULL;
#endif
#if PG_MAJORVERSION_NUM >= 17
uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE;
uint32 WAIT_EVENT_NEON_LFC_READ;
@@ -455,13 +450,15 @@ _PG_init(void)
*/
#if PG_VERSION_NUM >= 160000
load_file("$libdir/neon_rmgr", false);
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = neon_shmem_startup_hook;
#endif
/* dummy call to a Rust function in the communicator library, to check that it works */
(void) communicator_dummy(123);
pg_init_libpagestore();
relsize_hash_init();
lfc_init();
pg_init_walproposer();
init_lwlsncache();
@@ -555,16 +552,6 @@ _PG_init(void)
ReportSearchPath();
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = neon_shmem_request_hook;
#else
neon_shmem_request_hook();
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = neon_shmem_startup_hook;
prev_ExecutorStart = ExecutorStart_hook;
ExecutorStart_hook = neon_ExecutorStart;
prev_ExecutorEnd = ExecutorEnd_hook;
@@ -650,24 +637,7 @@ approximate_working_set_size(PG_FUNCTION_ARGS)
PG_RETURN_INT32(dc);
}
static void
neon_shmem_request_hook(void)
{
#if PG_VERSION_NUM >= 150000
if (prev_shmem_request_hook)
prev_shmem_request_hook();
#endif
LfcShmemRequest();
NeonPerfCountersShmemRequest();
PagestoreShmemRequest();
RelsizeCacheShmemRequest();
CommunicatorShmemRequest();
WalproposerShmemRequest();
LwLsnCacheShmemRequest();
}
#if PG_MAJORVERSION_NUM >= 16
static void
neon_shmem_startup_hook(void)
{
@@ -675,16 +645,6 @@ neon_shmem_startup_hook(void)
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
LfcShmemInit();
NeonPerfCountersShmemInit();
PagestoreShmemInit();
RelsizeCacheShmemInit();
CommunicatorShmemInit();
WalproposerShmemInit();
LwLsnCacheShmemInit();
#if PG_MAJORVERSION_NUM >= 17
WAIT_EVENT_NEON_LFC_MAINTENANCE = WaitEventExtensionNew("Neon/FileCache_Maintenance");
WAIT_EVENT_NEON_LFC_READ = WaitEventExtensionNew("Neon/FileCache_Read");
@@ -697,9 +657,8 @@ neon_shmem_startup_hook(void)
WAIT_EVENT_NEON_PS_READ = WaitEventExtensionNew("Neon/PS_ReadIO");
WAIT_EVENT_NEON_WAL_DL = WaitEventExtensionNew("Neon/WAL_Download");
#endif
LWLockRelease(AddinShmemInitLock);
}
#endif
/*
* ExecutorStart hook: start up tracking if needed

View File

@@ -58,7 +58,6 @@ extern uint32 WAIT_EVENT_NEON_WAL_DL;
(errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
#define InfiniteXLogRecPtr UINT64_MAX
extern void pg_init_libpagestore(void);
extern void pg_init_walproposer(void);
@@ -72,21 +71,4 @@ extern PGDLLEXPORT void WalProposerSync(int argc, char *argv[]);
extern PGDLLEXPORT void WalProposerMain(Datum main_arg);
extern PGDLLEXPORT void LogicalSlotsMonitorMain(Datum main_arg);
extern void LfcShmemRequest(void);
extern void PagestoreShmemRequest(void);
extern void RelsizeCacheShmemRequest(void);
extern void CommunicatorShmemRequest(void);
extern void WalproposerShmemRequest(void);
extern void LwLsnCacheShmemRequest(void);
extern void NeonPerfCountersShmemRequest(void);
extern void LfcShmemInit(void);
extern void PagestoreShmemInit(void);
extern void RelsizeCacheShmemInit(void);
extern void CommunicatorShmemInit(void);
extern void WalproposerShmemInit(void);
extern void LwLsnCacheShmemInit(void);
extern void NeonPerfCountersShmemInit(void);
#endif /* NEON_H */

View File

@@ -1,6 +1,5 @@
#include "postgres.h"
#include "neon.h"
#include "neon_lwlsncache.h"
#include "miscadmin.h"
@@ -82,6 +81,14 @@ static set_max_lwlsn_hook_type prev_set_max_lwlsn_hook = NULL;
static set_lwlsn_relation_hook_type prev_set_lwlsn_relation_hook = NULL;
static set_lwlsn_db_hook_type prev_set_lwlsn_db_hook = NULL;
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM >= 150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
static void shmemrequest(void);
static void shmeminit(void);
static void neon_set_max_lwlsn(XLogRecPtr lsn);
void
@@ -92,6 +99,16 @@ init_lwlsncache(void)
lwlc_register_gucs();
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = shmeminit;
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = shmemrequest;
#else
shmemrequest();
#endif
prev_set_lwlsn_block_range_hook = set_lwlsn_block_range_hook;
set_lwlsn_block_range_hook = neon_set_lwlsn_block_range;
prev_set_lwlsn_block_v_hook = set_lwlsn_block_v_hook;
@@ -107,19 +124,20 @@ init_lwlsncache(void)
}
void
LwLsnCacheShmemRequest(void)
{
static void shmemrequest(void) {
Size requested_size = sizeof(LwLsnCacheCtl);
requested_size += hash_estimate_size(lwlsn_cache_size, sizeof(LastWrittenLsnCacheEntry));
RequestAddinShmemSpace(requested_size);
#if PG_VERSION_NUM >= 150000
if (prev_shmem_request_hook)
prev_shmem_request_hook();
#endif
}
void
LwLsnCacheShmemInit(void)
{
static void shmeminit(void) {
static HASHCTL info;
bool found;
if (lwlsn_cache_size > 0)
@@ -139,6 +157,9 @@ LwLsnCacheShmemInit(void)
}
dlist_init(&LwLsnCache->lastWrittenLsnLRU);
LwLsnCache->maxLastWrittenLsn = GetRedoRecPtr();
if (prev_shmem_startup_hook) {
prev_shmem_startup_hook();
}
}
/*

View File

@@ -17,21 +17,22 @@
#include "storage/shmem.h"
#include "utils/builtins.h"
#include "neon.h"
#include "neon_perf_counters.h"
#include "neon_pgversioncompat.h"
neon_per_backend_counters *neon_per_backend_counters_shared;
void
NeonPerfCountersShmemRequest(void)
Size
NeonPerfCountersShmemSize(void)
{
Size size = mul_size(NUM_NEON_PERF_COUNTER_SLOTS, sizeof(neon_per_backend_counters));
RequestAddinShmemSpace(size);
Size size = 0;
size = add_size(size, mul_size(NUM_NEON_PERF_COUNTER_SLOTS,
sizeof(neon_per_backend_counters)));
return size;
}
void
NeonPerfCountersShmemInit(void)
{

View File

@@ -250,6 +250,7 @@ extern const f_smgr *smgr_neon(ProcNumber backend, NRelFileInfo rinfo);
extern void smgr_init_neon(void);
extern void readahead_buffer_resize(int newsize, void *extra);
/*
* LSN values associated with each request to the pageserver
*/

View File

@@ -675,7 +675,7 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
* always have that problem as the can always lag behind the
* primary, but for the primary we can avoid it by always
* requesting the latest page, by setting request LSN to
* InfiniteXLogRecPtr.
* UINT64_MAX.
*
* effective_request_lsn is used to check that received response is still valid.
* In case of primary node it is last written LSN. Originally we used flush_lsn here,
@@ -703,7 +703,7 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
* The problem can be fixed by callingGetFlushRecPtr() before checking if the page is in the buffer cache.
* But you can't do that within smgrprefetch(), would need to modify the caller.
*/
result->request_lsn = InfiniteXLogRecPtr;
result->request_lsn = UINT64_MAX;
result->not_modified_since = last_written_lsn;
result->effective_request_lsn = last_written_lsn;
}
@@ -2158,7 +2158,7 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
request_lsn = nm_adjust_lsn(request_lsn);
}
else
request_lsn = InfiniteXLogRecPtr;
request_lsn = UINT64_MAX;
/*
* GetRedoStartLsn() returns LSN of the basebackup. We know that the SLRU

View File

@@ -10,7 +10,6 @@
*/
#include "postgres.h"
#include "neon.h"
#include "neon_pgversioncompat.h"
#include "pagestore_client.h"
@@ -50,23 +49,32 @@ typedef struct
* algorithm */
} RelSizeHashControl;
static HTAB *relsize_hash;
static LWLockId relsize_lock;
static int relsize_hash_size;
static RelSizeHashControl* relsize_ctl;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
#if PG_VERSION_NUM >= 150000
static shmem_request_hook_type prev_shmem_request_hook = NULL;
static void relsize_shmem_request(void);
#endif
/*
* Size of a cache entry is 36 bytes. So this default will take about 2.3 MB,
* which seems reasonable.
*/
#define DEFAULT_RELSIZE_HASH_SIZE (64 * 1024)
static HTAB *relsize_hash;
static LWLockId relsize_lock;
static int relsize_hash_size = DEFAULT_RELSIZE_HASH_SIZE;
static RelSizeHashControl* relsize_ctl;
void
RelsizeCacheShmemInit(void)
static void
neon_smgr_shmem_startup(void)
{
static HASHCTL info;
bool found;
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
relsize_ctl = (RelSizeHashControl *) ShmemInitStruct("relsize_hash", sizeof(RelSizeHashControl), &found);
if (!found)
{
@@ -77,6 +85,7 @@ RelsizeCacheShmemInit(void)
relsize_hash_size, relsize_hash_size,
&info,
HASH_ELEM | HASH_BLOBS);
LWLockRelease(AddinShmemInitLock);
relsize_ctl->size = 0;
relsize_ctl->hits = 0;
relsize_ctl->misses = 0;
@@ -233,15 +242,34 @@ relsize_hash_init(void)
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
if (relsize_hash_size > 0)
{
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = relsize_shmem_request;
#else
RequestAddinShmemSpace(hash_estimate_size(relsize_hash_size, sizeof(RelSizeEntry)));
RequestNamedLWLockTranche("neon_relsize", 1);
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = neon_smgr_shmem_startup;
}
}
#if PG_VERSION_NUM >= 150000
/*
* shmem_request hook: request additional shared resources. We'll allocate or
* attach to the shared resources in neon_smgr_shmem_startup().
*/
void
RelsizeCacheShmemRequest(void)
static void
relsize_shmem_request(void)
{
if (prev_shmem_request_hook)
prev_shmem_request_hook();
RequestAddinShmemSpace(sizeof(RelSizeHashControl) + hash_estimate_size(relsize_hash_size, sizeof(RelSizeEntry)));
RequestNamedLWLockTranche("neon_relsize", 1);
}
#endif

View File

@@ -83,8 +83,10 @@ static XLogRecPtr standby_flush_lsn = InvalidXLogRecPtr;
static XLogRecPtr standby_apply_lsn = InvalidXLogRecPtr;
static HotStandbyFeedback agg_hs_feedback;
static void nwp_shmem_startup_hook(void);
static void nwp_register_gucs(void);
static void assign_neon_safekeepers(const char *newval, void *extra);
static void nwp_prepare_shmem(void);
static uint64 backpressure_lag_impl(void);
static uint64 startup_backpressure_wrap(void);
static bool backpressure_throttling_impl(void);
@@ -97,6 +99,11 @@ static TimestampTz walprop_pg_get_current_timestamp(WalProposer *wp);
static void walprop_pg_load_libpqwalreceiver(void);
static process_interrupts_callback_t PrevProcessInterruptsCallback = NULL;
static shmem_startup_hook_type prev_shmem_startup_hook_type;
#if PG_VERSION_NUM >= 150000
static shmem_request_hook_type prev_shmem_request_hook = NULL;
static void walproposer_shmem_request(void);
#endif
static void WalproposerShmemInit_SyncSafekeeper(void);
@@ -186,6 +193,8 @@ pg_init_walproposer(void)
nwp_register_gucs();
nwp_prepare_shmem();
delay_backend_us = &startup_backpressure_wrap;
PrevProcessInterruptsCallback = ProcessInterruptsCallback;
ProcessInterruptsCallback = backpressure_throttling_impl;
@@ -473,11 +482,12 @@ WalproposerShmemSize(void)
return sizeof(WalproposerShmemState);
}
void
static bool
WalproposerShmemInit(void)
{
bool found;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
walprop_shared = ShmemInitStruct("Walproposer shared state",
sizeof(WalproposerShmemState),
&found);
@@ -494,6 +504,9 @@ WalproposerShmemInit(void)
pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0);
/* END_HADRON */
}
LWLockRelease(AddinShmemInitLock);
return found;
}
static void
@@ -596,15 +609,42 @@ walprop_register_bgworker(void)
/* shmem handling */
static void
nwp_prepare_shmem(void)
{
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = walproposer_shmem_request;
#else
RequestAddinShmemSpace(WalproposerShmemSize());
#endif
prev_shmem_startup_hook_type = shmem_startup_hook;
shmem_startup_hook = nwp_shmem_startup_hook;
}
#if PG_VERSION_NUM >= 150000
/*
* shmem_request hook: request additional shared resources. We'll allocate or
* attach to the shared resources in WalproposerShmemInit().
* attach to the shared resources in nwp_shmem_startup_hook().
*/
void
WalproposerShmemRequest(void)
static void
walproposer_shmem_request(void)
{
if (prev_shmem_request_hook)
prev_shmem_request_hook();
RequestAddinShmemSpace(WalproposerShmemSize());
}
#endif
static void
nwp_shmem_startup_hook(void)
{
if (prev_shmem_startup_hook_type)
prev_shmem_startup_hook_type();
WalproposerShmemInit();
}
WalproposerShmemState *
GetWalpropShmemState(void)

View File

@@ -120,6 +120,7 @@ workspace_hack.workspace = true
[dev-dependencies]
assert-json-diff.workspace = true
camino-tempfile.workspace = true
criterion.workspace = true
fallible-iterator.workspace = true
flate2.workspace = true
tokio-tungstenite.workspace = true
@@ -130,3 +131,8 @@ walkdir.workspace = true
rand_distr = "0.4"
tokio-postgres.workspace = true
tracing-test = "0.2"
[[bench]]
name = "logging"
harness = false

127
proxy/benches/logging.rs Normal file
View File

@@ -0,0 +1,127 @@
use std::io;
use criterion::{Criterion, criterion_group, criterion_main};
use proxy::logging::{Clock, JsonLoggingLayer};
use tracing_subscriber::prelude::*;
struct DevNullWriter;
impl proxy::logging::MakeWriter for DevNullWriter {
fn make_writer(&self) -> impl io::Write {
DevNullWriter
}
}
impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for DevNullWriter {
type Writer = DevNullWriter;
fn make_writer(&'a self) -> Self::Writer {
DevNullWriter
}
}
impl io::Write for DevNullWriter {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
Ok(criterion::black_box(buf).len())
}
#[inline(always)]
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
struct FixedClock;
impl Clock for FixedClock {
fn now(&self) -> chrono::DateTime<chrono::Utc> {
const { chrono::DateTime::from_timestamp_nanos(1747859990_000_000_000).to_utc() }
}
}
pub fn bench_logging(c: &mut Criterion) {
c.bench_function("text fmt current", |b| {
let registry = tracing_subscriber::Registry::default().with(
tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_target(false)
.with_writer(DevNullWriter),
);
tracing::subscriber::with_default(registry, || {
tracing::info_span!("span1", a = 42, b = true, c = "string").in_scope(|| {
tracing::info_span!("span2", a = 42, b = true, c = "string").in_scope(|| {
b.iter(|| {
tracing::error!(a = 42, b = true, c = "string", "message field");
})
});
});
});
});
c.bench_function("text fmt full", |b| {
let registry = tracing_subscriber::Registry::default().with(
tracing_subscriber::fmt::layer()
.with_level(true)
.with_file(true)
.with_line_number(true)
.with_target(true)
.with_thread_ids(true)
.with_writer(DevNullWriter),
);
tracing::subscriber::with_default(registry, || {
tracing::info_span!("span1", a = 42, b = true, c = "string").in_scope(|| {
tracing::info_span!("span2", a = 42, b = true, c = "string").in_scope(|| {
b.iter(|| {
tracing::error!(a = 42, b = true, c = "string", "message field");
})
});
});
});
});
c.bench_function("json fmt", |b| {
let registry = tracing_subscriber::Registry::default().with(
tracing_subscriber::fmt::layer()
.with_level(true)
.with_file(true)
.with_line_number(true)
.with_target(true)
.with_thread_ids(true)
.with_writer(DevNullWriter)
.json(),
);
tracing::subscriber::with_default(registry, || {
tracing::info_span!("span1", a = 42, b = true, c = "string").in_scope(|| {
tracing::info_span!("span2", a = 42, b = true, c = "string").in_scope(|| {
b.iter(|| {
tracing::error!(a = 42, b = true, c = "string", "message field");
})
});
});
});
});
c.bench_function("json custom", |b| {
let registry = tracing_subscriber::Registry::default().with(JsonLoggingLayer::new(
FixedClock,
DevNullWriter,
&["a"],
));
tracing::subscriber::with_default(registry, || {
tracing::info_span!("span1", a = 42, b = true, c = "string").in_scope(|| {
tracing::info_span!("span2", a = 42, b = true, c = "string").in_scope(|| {
b.iter(|| {
tracing::error!(a = 42, b = true, c = "string", "message field");
})
});
});
});
});
}
criterion_group!(benches, bench_logging);
criterion_main!(benches);

View File

@@ -93,7 +93,7 @@ mod ext;
mod http;
mod intern;
mod jemalloc;
mod logging;
pub mod logging;
mod metrics;
mod parse;
mod pglb;

View File

@@ -148,11 +148,11 @@ impl LogFormat {
}
}
trait MakeWriter {
pub trait MakeWriter {
fn make_writer(&self) -> impl io::Write;
}
struct StderrWriter {
pub struct StderrWriter {
stderr: io::Stderr,
}
@@ -164,11 +164,11 @@ impl MakeWriter for StderrWriter {
}
// TODO: move into separate module or even separate crate.
trait Clock {
pub trait Clock {
fn now(&self) -> DateTime<Utc>;
}
struct RealClock;
pub struct RealClock;
impl Clock for RealClock {
#[inline]
@@ -203,7 +203,7 @@ type CallsiteMap<T> =
papaya::HashMap<callsite::Identifier, T, std::hash::BuildHasherDefault<rustc_hash::FxHasher>>;
/// Implements tracing layer to handle events specific to logging.
struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
pub struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
clock: C,
writer: W,
@@ -217,7 +217,7 @@ struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
}
impl<C: Clock, W: MakeWriter> JsonLoggingLayer<C, W> {
fn new(clock: C, writer: W, extract_fields: &'static [&'static str]) -> Self {
pub fn new(clock: C, writer: W, extract_fields: &'static [&'static str]) -> Self {
JsonLoggingLayer {
clock,
skipped_field_indices: CallsiteMap::default(),

View File

@@ -110,7 +110,7 @@ where
debug!(error = ?err, COULD_NOT_CONNECT);
let node_info = if !node_info.cached() || !err.should_retry_wake_compute() {
// If we just recieved this from cplane and didn't get it from cache, we shouldn't retry.
// If we just received this from cplane and not from the cache, we shouldn't retry.
// Do not need to retrieve a new node_info, just return the old one.
if !should_retry(&err, num_retries, compute.retry) {
Metrics::get().proxy.retries_metric.observe(

View File

@@ -58,6 +58,7 @@ metrics.workspace = true
pem.workspace = true
postgres_backend.workspace = true
postgres_ffi.workspace = true
postgres_ffi_types.workspace = true
postgres_versioninfo.workspace = true
pq_proto.workspace = true
remote_storage.workspace = true

View File

@@ -12,7 +12,8 @@ use futures::FutureExt;
use itertools::Itertools;
use parking_lot::Mutex;
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend, PostgresBackendReader, QueryError};
use postgres_ffi::{MAX_SEND_SIZE, PgMajorVersion, TimestampTz, get_current_timestamp};
use postgres_ffi::{MAX_SEND_SIZE, PgMajorVersion, get_current_timestamp};
use postgres_ffi_types::TimestampTz;
use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
use safekeeper_api::Term;
use safekeeper_api::models::{

View File

@@ -728,7 +728,7 @@ class NeonEnvBuilder:
# NB: neon_local rewrites postgresql.conf on each start based on neon_local config. No need to patch it.
# However, in this new NeonEnv, the pageservers and safekeepers listen on different ports, and the storage
# controller will currently reject re-attach requests from them because the NodeMetadata isn't identical.
# So, from_repo_dir patches up the the storcon database.
# So, from_repo_dir patches up the storcon database.
patch_script_path = self.repo_dir / "storage_controller_db.startup.sql"
assert not patch_script_path.exists()
patch_script = ""

View File

@@ -24,10 +24,7 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder):
[
".*get_values_reconstruct_data for layer .*",
".*could not find data for key.*",
".*is not active. Current state: Broken.*",
".*will not become active. Current state: Broken.*",
".*failed to load metadata.*",
".*load failed.*load local timeline.*",
".*: layer load failed, assuming permanent failure:.*",
".*failed to get checkpoint bytes.*",
".*failed to get control bytes.*",

View File

@@ -687,7 +687,7 @@ def test_sharding_compaction(
for _i in range(0, 10):
# Each of these does some writes then a checkpoint: because we set image_creation_threshold to 1,
# these should result in image layers each time we write some data into a shard, and also shards
# recieving less data hitting their "empty image layer" path (wherre they should skip writing the layer,
# receiving less data hitting their "empty image layer" path (where they should skip writing the layer,
# rather than asserting)
workload.churn_rows(64)

View File

@@ -76,7 +76,6 @@ def test_tenants_normal_work(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
"""Tests tenants with and without wal acceptors"""
tenant_1, _ = env.create_tenant()
tenant_2, _ = env.create_tenant()