mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-24 07:40:36 +00:00
Compare commits
3 Commits
min_prefet
...
cloneable/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ef737e7d7c | ||
|
|
5c934efb29 | ||
|
|
5c9c3b3317 |
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -5303,6 +5303,7 @@ dependencies = [
|
||||
"clashmap",
|
||||
"compute_api",
|
||||
"consumption_metrics",
|
||||
"criterion",
|
||||
"ecdsa 0.16.9",
|
||||
"ed25519-dalek",
|
||||
"env_logger",
|
||||
@@ -6211,6 +6212,7 @@ dependencies = [
|
||||
"postgres-protocol",
|
||||
"postgres_backend",
|
||||
"postgres_ffi",
|
||||
"postgres_ffi_types",
|
||||
"postgres_versioninfo",
|
||||
"pprof",
|
||||
"pq_proto",
|
||||
@@ -6255,7 +6257,7 @@ dependencies = [
|
||||
"anyhow",
|
||||
"const_format",
|
||||
"pageserver_api",
|
||||
"postgres_ffi",
|
||||
"postgres_ffi_types",
|
||||
"postgres_versioninfo",
|
||||
"pq_proto",
|
||||
"serde",
|
||||
|
||||
@@ -1286,9 +1286,7 @@ impl ComputeNode {
|
||||
|
||||
// In case of error, log and fail the check, but don't crash.
|
||||
// We're playing it safe because these errors could be transient
|
||||
// and we don't yet retry. Also being careful here allows us to
|
||||
// be backwards compatible with safekeepers that don't have the
|
||||
// TIMELINE_STATUS API yet.
|
||||
// and we don't yet retry.
|
||||
if responses.len() < quorum {
|
||||
error!(
|
||||
"failed sync safekeepers check {:?} {:?} {:?}",
|
||||
|
||||
@@ -464,7 +464,7 @@ impl Endpoint {
|
||||
conf.append("max_connections", "100");
|
||||
conf.append("wal_level", "logical");
|
||||
// wal_sender_timeout is the maximum time to wait for WAL replication.
|
||||
// It also defines how often the walreciever will send a feedback message to the wal sender.
|
||||
// It also defines how often the walreceiver will send a feedback message to the wal sender.
|
||||
conf.append("wal_sender_timeout", "5s");
|
||||
conf.append("listen_addresses", &self.pg_address.ip().to_string());
|
||||
conf.append("port", &self.pg_address.port().to_string());
|
||||
|
||||
@@ -75,7 +75,7 @@ CLI examples:
|
||||
* AWS S3 : `env AWS_ACCESS_KEY_ID='SOMEKEYAAAAASADSAH*#' AWS_SECRET_ACCESS_KEY='SOMEsEcReTsd292v' ${PAGESERVER_BIN} -c "remote_storage={bucket_name='some-sample-bucket',bucket_region='eu-north-1', prefix_in_bucket='/test_prefix/'}"`
|
||||
|
||||
For Amazon AWS S3, a key id and secret access key could be located in `~/.aws/credentials` if awscli was ever configured to work with the desired bucket, on the AWS Settings page for a certain user. Also note, that the bucket names does not contain any protocols when used on AWS.
|
||||
For local S3 installations, refer to the their documentation for name format and credentials.
|
||||
For local S3 installations, refer to their documentation for name format and credentials.
|
||||
|
||||
Similar to other pageserver settings, toml config file can be used to configure either of the storages as backup targets.
|
||||
Required sections are:
|
||||
|
||||
@@ -110,7 +110,6 @@ fn main() -> anyhow::Result<()> {
|
||||
.allowlist_type("XLogRecPtr")
|
||||
.allowlist_type("XLogSegNo")
|
||||
.allowlist_type("TimeLineID")
|
||||
.allowlist_type("TimestampTz")
|
||||
.allowlist_type("MultiXactId")
|
||||
.allowlist_type("MultiXactOffset")
|
||||
.allowlist_type("MultiXactStatus")
|
||||
|
||||
@@ -227,8 +227,7 @@ pub mod walrecord;
|
||||
// Export some widely used datatypes that are unlikely to change across Postgres versions
|
||||
pub use v14::bindings::{
|
||||
BlockNumber, CheckPoint, ControlFileData, MultiXactId, OffsetNumber, Oid, PageHeaderData,
|
||||
RepOriginId, TimeLineID, TimestampTz, TransactionId, XLogRecPtr, XLogRecord, XLogSegNo, uint32,
|
||||
uint64,
|
||||
RepOriginId, TimeLineID, TransactionId, XLogRecPtr, XLogRecord, XLogSegNo, uint32, uint64,
|
||||
};
|
||||
// Likewise for these, although the assumption that these don't change is a little more iffy.
|
||||
pub use v14::bindings::{MultiXactOffset, MultiXactStatus};
|
||||
|
||||
@@ -4,13 +4,14 @@
|
||||
//! TODO: Generate separate types for each supported PG version
|
||||
|
||||
use bytes::{Buf, Bytes};
|
||||
use postgres_ffi_types::TimestampTz;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::bin_ser::DeserializeError;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::{
|
||||
BLCKSZ, BlockNumber, MultiXactId, MultiXactOffset, MultiXactStatus, Oid, PgMajorVersion,
|
||||
RepOriginId, TimestampTz, TransactionId, XLOG_SIZE_OF_XLOG_RECORD, XLogRecord, pg_constants,
|
||||
RepOriginId, TransactionId, XLOG_SIZE_OF_XLOG_RECORD, XLogRecord, pg_constants,
|
||||
};
|
||||
|
||||
#[repr(C)]
|
||||
@@ -863,7 +864,8 @@ pub mod v17 {
|
||||
XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapMultiInsert, XlHeapUpdate, XlParameterChange,
|
||||
rm_neon,
|
||||
};
|
||||
pub use crate::{TimeLineID, TimestampTz};
|
||||
pub use crate::TimeLineID;
|
||||
pub use postgres_ffi_types::TimestampTz;
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -9,10 +9,11 @@
|
||||
|
||||
use super::super::waldecoder::WalStreamDecoder;
|
||||
use super::bindings::{
|
||||
CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID, TimestampTz,
|
||||
CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID,
|
||||
XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC,
|
||||
MY_PGVERSION
|
||||
};
|
||||
use postgres_ffi_types::TimestampTz;
|
||||
use super::wal_generator::LogicalMessageGenerator;
|
||||
use crate::pg_constants;
|
||||
use crate::PG_TLI;
|
||||
|
||||
@@ -11,3 +11,4 @@ pub mod forknum;
|
||||
|
||||
pub type Oid = u32;
|
||||
pub type RepOriginId = u16;
|
||||
pub type TimestampTz = i64;
|
||||
|
||||
@@ -9,7 +9,7 @@ anyhow.workspace = true
|
||||
const_format.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
postgres_ffi_types.workspace = true
|
||||
postgres_versioninfo.workspace = true
|
||||
pq_proto.workspace = true
|
||||
tokio.workspace = true
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_ffi::TimestampTz;
|
||||
use postgres_ffi_types::TimestampTz;
|
||||
use postgres_versioninfo::PgVersionId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::time::Instant;
|
||||
|
||||
@@ -2,7 +2,8 @@
|
||||
|
||||
use bytes::Bytes;
|
||||
use postgres_ffi::walrecord::{MultiXactMember, describe_postgres_wal_record};
|
||||
use postgres_ffi::{MultiXactId, MultiXactOffset, TimestampTz, TransactionId};
|
||||
use postgres_ffi::{MultiXactId, MultiXactOffset, TransactionId};
|
||||
use postgres_ffi_types::TimestampTz;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::bin_ser::DeserializeError;
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
//! The validator is responsible for validating DeletionLists for execution,
|
||||
//! based on whethe the generation in the DeletionList is still the latest
|
||||
//! based on whether the generation in the DeletionList is still the latest
|
||||
//! generation for a tenant.
|
||||
//!
|
||||
//! The purpose of validation is to ensure split-brain safety in the cluster
|
||||
|
||||
@@ -25,9 +25,9 @@ use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
|
||||
use pageserver_api::models::RelSizeMigration;
|
||||
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_ffi::{BLCKSZ, PgMajorVersion, TimestampTz, TransactionId};
|
||||
use postgres_ffi::{BLCKSZ, PgMajorVersion, TransactionId};
|
||||
use postgres_ffi_types::forknum::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi_types::{Oid, RepOriginId};
|
||||
use postgres_ffi_types::{Oid, RepOriginId, TimestampTz};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use strum::IntoEnumIterator;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//! An utilization metric which is used to decide on which pageserver to put next tenant.
|
||||
//!
|
||||
//! The metric is exposed via `GET /v1/utilization`. Refer and maintain it's openapi spec as the
|
||||
//! The metric is exposed via `GET /v1/utilization`. Refer and maintain its openapi spec as the
|
||||
//! truth.
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
@@ -32,9 +32,10 @@ use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_ffi::walrecord::*;
|
||||
use postgres_ffi::{
|
||||
PgMajorVersion, TimestampTz, TransactionId, dispatch_pgversion, enum_pgversion,
|
||||
enum_pgversion_dispatch, fsm_logical_to_physical, pg_constants,
|
||||
PgMajorVersion, TransactionId, dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch,
|
||||
fsm_logical_to_physical, pg_constants,
|
||||
};
|
||||
use postgres_ffi_types::TimestampTz;
|
||||
use postgres_ffi_types::forknum::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use tracing::*;
|
||||
use utils::bin_ser::{DeserializeError, SerializeError};
|
||||
@@ -1069,7 +1070,7 @@ impl WalIngest {
|
||||
// NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
|
||||
// go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
|
||||
// read it, like GetNewMultiXactId(). This is different from how nextXid is
|
||||
// incremented! nextXid skips over < FirstNormalTransactionId when the the value
|
||||
// incremented! nextXid skips over < FirstNormalTransactionId when the value
|
||||
// is stored, so it's never 0 in a checkpoint.
|
||||
//
|
||||
// I don't know why it's done that way, it seems less error-prone to skip over 0
|
||||
|
||||
@@ -38,8 +38,6 @@ DATA = \
|
||||
neon--1.3--1.4.sql \
|
||||
neon--1.4--1.5.sql \
|
||||
neon--1.5--1.6.sql \
|
||||
neon--1.6--1.7.sql \
|
||||
neon--1.7--1.6.sql \
|
||||
neon--1.6--1.5.sql \
|
||||
neon--1.5--1.4.sql \
|
||||
neon--1.4--1.3.sql \
|
||||
|
||||
@@ -54,7 +54,6 @@
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/twophase.h"
|
||||
#include "access/xlog.h"
|
||||
#include "access/xlogdefs.h"
|
||||
#include "access/xlog_internal.h"
|
||||
@@ -65,7 +64,6 @@
|
||||
#include "miscadmin.h"
|
||||
#include "port/pg_iovec.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "replication/walsender.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "utils/timeout.h"
|
||||
@@ -77,18 +75,11 @@
|
||||
#include "neon_perf_counters.h"
|
||||
#include "pagestore_client.h"
|
||||
|
||||
#if PG_MAJORVERSION_NUM >= 17
|
||||
#include "storage/procnumber.h"
|
||||
#else
|
||||
#define MyProcNumber MyProc->pgprocno
|
||||
#endif
|
||||
|
||||
|
||||
#if PG_MAJORVERSION_NUM >= 15
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
#include "access/xlogrecovery.h"
|
||||
#endif
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
#if PG_VERSION_NUM < 160000
|
||||
typedef PGAlignedBlock PGIOAlignedBlock;
|
||||
#endif
|
||||
|
||||
@@ -303,15 +294,6 @@ static PrefetchState *MyPState;
|
||||
|
||||
static process_interrupts_callback_t prev_interrupt_cb;
|
||||
|
||||
/*
|
||||
* Array in shared memory each cell of which contains minimal in-flight request LSN sent to PS by the backend which procno is
|
||||
* used as index in this array. This array is initially filled with InfiniteXlogRecPtr (UINT64_MAX) so if backend
|
||||
* didn't send any request to PS, then this value doesn't effect global min.
|
||||
*
|
||||
* We support only 64-bit platforms and so assume that access to array elements is atomic and no any synchronization is needed.
|
||||
*/
|
||||
static XLogRecPtr* minPrefetchLsn;
|
||||
|
||||
static bool compact_prefetch_buffers(void);
|
||||
static void consume_prefetch_responses(void);
|
||||
static uint64 prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
|
||||
@@ -334,41 +316,6 @@ pg_init_communicator(void)
|
||||
ProcessInterruptsCallback = communicator_processinterrupts;
|
||||
}
|
||||
|
||||
static Size
|
||||
CommunicatorShmemSize(void)
|
||||
{
|
||||
#if PG_MAJORVERSION_NUM >= 15
|
||||
Assert(MaxBackends != 0);
|
||||
return (MaxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts) * sizeof(XLogRecPtr);
|
||||
#else
|
||||
return (MAX_BACKENDS + NUM_AUXILIARY_PROCS + max_prepared_xacts) * sizeof(XLogRecPtr);
|
||||
#endif
|
||||
}
|
||||
|
||||
void
|
||||
CommunicatorShmemRequest(void)
|
||||
{
|
||||
RequestAddinShmemSpace(CommunicatorShmemSize());
|
||||
}
|
||||
|
||||
void
|
||||
CommunicatorShmemInit(void)
|
||||
{
|
||||
bool found;
|
||||
minPrefetchLsn = (XLogRecPtr*)ShmemInitStruct("Communicator shared state",
|
||||
CommunicatorShmemSize(),
|
||||
&found);
|
||||
if (!found)
|
||||
{
|
||||
/*
|
||||
* Fill with InfiniteXLogRecPtr (UINT64_MAX).
|
||||
* If backend didn't send any requests to PS, then InfiniteXLogRecPtr doesn't affect global minimal value.
|
||||
*/
|
||||
memset(minPrefetchLsn, 0xFF, CommunicatorShmemSize());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
compact_prefetch_buffers(void)
|
||||
{
|
||||
@@ -507,20 +454,6 @@ check_getpage_response(PrefetchRequest* slot, NeonResponse* resp)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Update min in-flight prefetch LSN for this backend.
|
||||
*/
|
||||
static void
|
||||
update_min_prefetch_lsn(uint64 ring_index)
|
||||
{
|
||||
if (ring_index + 1 < MyPState->ring_unused)
|
||||
{
|
||||
PrefetchRequest* next_slot = GetPrfSlot(ring_index + 1);
|
||||
Assert(minPrefetchLsn[MyProcNumber] <= next_slot->request_lsns.request_lsn);
|
||||
minPrefetchLsn[MyProcNumber] = next_slot->request_lsns.request_lsn;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* If there might be responses still in the TCP buffer, then we should try to
|
||||
* use those, to reduce any TCP backpressure on the OS/PS side.
|
||||
@@ -545,9 +478,8 @@ communicator_prefetch_pump_state(void)
|
||||
NeonResponse *response;
|
||||
PrefetchRequest *slot;
|
||||
MemoryContext old;
|
||||
uint64 my_ring_index = MyPState->ring_receive;
|
||||
|
||||
slot = GetPrfSlot(my_ring_index);
|
||||
slot = GetPrfSlot(MyPState->ring_receive);
|
||||
|
||||
old = MemoryContextSwitchTo(MyPState->errctx);
|
||||
response = page_server->try_receive(slot->shard_no);
|
||||
@@ -556,19 +488,17 @@ communicator_prefetch_pump_state(void)
|
||||
if (response == NULL)
|
||||
break;
|
||||
|
||||
update_min_prefetch_lsn(my_ring_index);
|
||||
|
||||
check_getpage_response(slot, response);
|
||||
|
||||
/* The slot should still be valid */
|
||||
if (slot->status != PRFS_REQUESTED ||
|
||||
slot->response != NULL ||
|
||||
slot->my_ring_index != my_ring_index)
|
||||
slot->my_ring_index != MyPState->ring_receive)
|
||||
{
|
||||
neon_shard_log(slot->shard_no, PANIC,
|
||||
"Incorrect prefetch slot state after receive: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "",
|
||||
slot->status, slot->response,
|
||||
slot->my_ring_index, my_ring_index);
|
||||
slot->my_ring_index, MyPState->ring_receive);
|
||||
}
|
||||
/* update prefetch state */
|
||||
MyPState->n_responses_buffered += 1;
|
||||
@@ -735,9 +665,6 @@ consume_prefetch_responses(void)
|
||||
{
|
||||
if (MyPState->ring_receive < MyPState->ring_unused)
|
||||
prefetch_wait_for(MyPState->ring_unused - 1);
|
||||
|
||||
minPrefetchLsn[MyProcNumber] = InfiniteXLogRecPtr; /* No more in-flight prefetch requests from this backend */
|
||||
|
||||
/*
|
||||
* We know for sure we're not working on any prefetch pages after
|
||||
* this.
|
||||
@@ -879,9 +806,6 @@ prefetch_read(PrefetchRequest *slot)
|
||||
old = MemoryContextSwitchTo(MyPState->errctx);
|
||||
response = (NeonResponse *) page_server->receive(shard_no);
|
||||
MemoryContextSwitchTo(old);
|
||||
|
||||
update_min_prefetch_lsn(my_ring_index);
|
||||
|
||||
if (response)
|
||||
{
|
||||
check_getpage_response(slot, response);
|
||||
@@ -1000,8 +924,6 @@ prefetch_on_ps_disconnect(void)
|
||||
MyNeonCounters->getpage_prefetch_discards_total += 1;
|
||||
}
|
||||
|
||||
minPrefetchLsn[MyProcNumber] = InfiniteXLogRecPtr; /* No more in-flight prefetch requests from this backend */
|
||||
|
||||
/*
|
||||
* We can have gone into retry due to network error, so update stats with
|
||||
* the latest available
|
||||
@@ -1103,8 +1025,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
Assert(slot->response == NULL);
|
||||
Assert(slot->my_ring_index == MyPState->ring_unused);
|
||||
|
||||
minPrefetchLsn[MyProcNumber] = Min(request.hdr.lsn, minPrefetchLsn[MyProcNumber]);
|
||||
|
||||
while (!page_server->send(slot->shard_no, (NeonRequest *) &request))
|
||||
{
|
||||
Assert(mySlotNo == MyPState->ring_unused);
|
||||
@@ -1125,23 +1045,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
Assert(!found);
|
||||
}
|
||||
|
||||
/*
|
||||
* Check that returned page LSN is consistent with request lsns
|
||||
*/
|
||||
static void
|
||||
check_page_lsn(NeonGetPageResponse* resp)
|
||||
{
|
||||
if (PageGetLSN(resp->page) > resp->req.hdr.not_modified_since)
|
||||
neon_log(PANIC, "Invalid getpage response version: %X/%08X is higher than last modified LSN %X/%08X",
|
||||
LSN_FORMAT_ARGS(PageGetLSN(resp->page)),
|
||||
LSN_FORMAT_ARGS(resp->req.hdr.not_modified_since));
|
||||
|
||||
if (PageGetLSN(resp->page) > resp->req.hdr.lsn)
|
||||
neon_log(PANIC, "Invalid getpage response version: %X/%08X is higher than request LSN %X/%08X",
|
||||
LSN_FORMAT_ARGS(PageGetLSN(resp->page)),
|
||||
LSN_FORMAT_ARGS(resp->req.hdr.lsn));
|
||||
}
|
||||
|
||||
/*
|
||||
* Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted.
|
||||
* Present pages are marked in "mask" bitmap and total number of such pages is returned.
|
||||
@@ -1165,7 +1068,7 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
|
||||
for (int i = 0; i < nblocks; i++)
|
||||
{
|
||||
PrfHashEntry *entry;
|
||||
NeonGetPageResponse* resp;
|
||||
|
||||
hashkey.buftag.blockNum = blocknum + i;
|
||||
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
|
||||
|
||||
@@ -1198,9 +1101,8 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
|
||||
continue;
|
||||
}
|
||||
Assert(slot->response->tag == T_NeonGetPageResponse); /* checked by check_getpage_response when response was assigned to the slot */
|
||||
resp = (NeonGetPageResponse*)slot->response;
|
||||
check_page_lsn(resp);
|
||||
memcpy(buffers[i], resp->page, BLCKSZ);
|
||||
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
|
||||
|
||||
|
||||
/*
|
||||
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
|
||||
@@ -1551,7 +1453,6 @@ page_server_request(void const *req)
|
||||
PG_TRY();
|
||||
{
|
||||
before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
|
||||
minPrefetchLsn[MyProcNumber] = ((NeonRequest *)req)->lsn;
|
||||
do
|
||||
{
|
||||
while (!page_server->send(shard_no, (NeonRequest *) req)
|
||||
@@ -1563,12 +1464,10 @@ page_server_request(void const *req)
|
||||
resp = page_server->receive(shard_no);
|
||||
MyNeonCounters->pageserver_open_requests--;
|
||||
} while (resp == NULL);
|
||||
minPrefetchLsn[MyProcNumber] = InfiniteXLogRecPtr;
|
||||
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
minPrefetchLsn[MyProcNumber] = InfiniteXLogRecPtr;
|
||||
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
|
||||
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
|
||||
HOLD_INTERRUPTS();
|
||||
@@ -1989,7 +1888,7 @@ communicator_init(void)
|
||||
* the check here. That's OK, we don't expect the logic to change in old
|
||||
* releases.
|
||||
*/
|
||||
#if PG_MAJORVERSION_NUM >= 15
|
||||
#if PG_VERSION_NUM>=150000
|
||||
if (MyNeonCounters >= &neon_per_backend_counters_shared[NUM_NEON_PERF_COUNTER_SLOTS])
|
||||
elog(ERROR, "MyNeonCounters points past end of array");
|
||||
#endif
|
||||
@@ -2068,7 +1967,7 @@ neon_prefetch_response_usable(neon_request_lsns *request_lsns,
|
||||
* Each request to the pageserver has three LSN values associated with it:
|
||||
* `not_modified_since`, `request_lsn`, and 'effective_request_lsn'.
|
||||
* `not_modified_since` and `request_lsn` are sent to the pageserver, but
|
||||
* in the primary node, we always use InfiniteXLogRecPtr as the `request_lsn`, so
|
||||
* in the primary node, we always use UINT64_MAX as the `request_lsn`, so
|
||||
* we remember `effective_request_lsn` separately. In a primary,
|
||||
* `effective_request_lsn` is the same as `not_modified_since`.
|
||||
* See comments in neon_get_request_lsns why we can not use last flush WAL position here.
|
||||
@@ -2328,7 +2227,6 @@ Retry:
|
||||
case T_NeonGetPageResponse:
|
||||
{
|
||||
NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp;
|
||||
check_page_lsn(getpage_resp);
|
||||
memcpy(buffer, getpage_resp->page, BLCKSZ);
|
||||
|
||||
/*
|
||||
@@ -2526,18 +2424,15 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
|
||||
PG_TRY();
|
||||
{
|
||||
before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
|
||||
minPrefetchLsn[MyProcNumber] = request_lsns->request_lsn;
|
||||
do
|
||||
{
|
||||
while (!page_server->send(shard_no, &request.hdr) || !page_server->flush(shard_no));
|
||||
resp = page_server->receive(shard_no);
|
||||
} while (resp == NULL);
|
||||
minPrefetchLsn[MyProcNumber] = InfiniteXLogRecPtr;
|
||||
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
minPrefetchLsn[MyProcNumber] = InfiniteXLogRecPtr;
|
||||
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
|
||||
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
|
||||
HOLD_INTERRUPTS();
|
||||
@@ -2682,19 +2577,3 @@ communicator_processinterrupts(void)
|
||||
|
||||
return prev_interrupt_cb();
|
||||
}
|
||||
|
||||
PG_FUNCTION_INFO_V1(neon_communicator_min_inflight_request_lsn);
|
||||
|
||||
Datum
|
||||
neon_communicator_min_inflight_request_lsn(PG_FUNCTION_ARGS)
|
||||
{
|
||||
XLogRecPtr min_lsn = RecoveryInProgress()
|
||||
? GetXLogReplayRecPtr(NULL)
|
||||
: InfiniteXLogRecPtr;
|
||||
size_t n_procs = ProcGlobal->allProcCount;
|
||||
for (size_t i = 0; i < n_procs; i++)
|
||||
{
|
||||
min_lsn = Min(min_lsn, minPrefetchLsn[i]);
|
||||
}
|
||||
PG_RETURN_INT64(min_lsn);
|
||||
}
|
||||
|
||||
@@ -46,4 +46,5 @@ extern int communicator_read_slru_segment(SlruKind kind, int64 segno,
|
||||
extern void communicator_reconfigure_timeout_if_needed(void);
|
||||
extern void communicator_prefetch_pump_state(void);
|
||||
|
||||
|
||||
#endif
|
||||
|
||||
@@ -219,6 +219,10 @@ static char *lfc_path;
|
||||
static uint64 lfc_generation;
|
||||
static FileCacheControl *lfc_ctl;
|
||||
static bool lfc_do_prewarm;
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook;
|
||||
#if PG_VERSION_NUM>=150000
|
||||
static shmem_request_hook_type prev_shmem_request_hook;
|
||||
#endif
|
||||
|
||||
bool lfc_store_prefetch_result;
|
||||
bool lfc_prewarm_update_ws_estimation;
|
||||
@@ -338,14 +342,18 @@ lfc_ensure_opened(void)
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
LfcShmemInit(void)
|
||||
static void
|
||||
lfc_shmem_startup(void)
|
||||
{
|
||||
bool found;
|
||||
static HASHCTL info;
|
||||
|
||||
if (lfc_max_size <= 0)
|
||||
return;
|
||||
if (prev_shmem_startup_hook)
|
||||
{
|
||||
prev_shmem_startup_hook();
|
||||
}
|
||||
|
||||
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
|
||||
|
||||
lfc_ctl = (FileCacheControl *) ShmemInitStruct("lfc", sizeof(FileCacheControl), &found);
|
||||
if (!found)
|
||||
@@ -390,16 +398,19 @@ LfcShmemInit(void)
|
||||
ConditionVariableInit(&lfc_ctl->cv[i]);
|
||||
|
||||
}
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
}
|
||||
|
||||
void
|
||||
LfcShmemRequest(void)
|
||||
static void
|
||||
lfc_shmem_request(void)
|
||||
{
|
||||
if (lfc_max_size > 0)
|
||||
{
|
||||
RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, FILE_CACHE_ENRTY_SIZE));
|
||||
RequestNamedLWLockTranche("lfc_lock", 1);
|
||||
}
|
||||
#if PG_VERSION_NUM>=150000
|
||||
if (prev_shmem_request_hook)
|
||||
prev_shmem_request_hook();
|
||||
#endif
|
||||
|
||||
RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, FILE_CACHE_ENRTY_SIZE));
|
||||
RequestNamedLWLockTranche("lfc_lock", 1);
|
||||
}
|
||||
|
||||
static bool
|
||||
@@ -631,6 +642,18 @@ lfc_init(void)
|
||||
NULL,
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
if (lfc_max_size == 0)
|
||||
return;
|
||||
|
||||
prev_shmem_startup_hook = shmem_startup_hook;
|
||||
shmem_startup_hook = lfc_shmem_startup;
|
||||
#if PG_VERSION_NUM>=150000
|
||||
prev_shmem_request_hook = shmem_request_hook;
|
||||
shmem_request_hook = lfc_shmem_request;
|
||||
#else
|
||||
lfc_shmem_request();
|
||||
#endif
|
||||
}
|
||||
|
||||
FileCacheState*
|
||||
|
||||
@@ -118,6 +118,10 @@ typedef struct
|
||||
ShardMap shard_map;
|
||||
} PagestoreShmemState;
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
static shmem_request_hook_type prev_shmem_request_hook = NULL;
|
||||
#endif
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook;
|
||||
static PagestoreShmemState *pagestore_shared;
|
||||
static uint64 pagestore_local_counter = 0;
|
||||
|
||||
@@ -1280,12 +1284,18 @@ check_neon_id(char **newval, void **extra, GucSource source)
|
||||
return **newval == '\0' || HexDecodeString(id, *newval, 16);
|
||||
}
|
||||
|
||||
static Size
|
||||
PagestoreShmemSize(void)
|
||||
{
|
||||
return add_size(sizeof(PagestoreShmemState), NeonPerfCountersShmemSize());
|
||||
}
|
||||
|
||||
void
|
||||
static bool
|
||||
PagestoreShmemInit(void)
|
||||
{
|
||||
bool found;
|
||||
|
||||
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
|
||||
pagestore_shared = ShmemInitStruct("libpagestore shared state",
|
||||
sizeof(PagestoreShmemState),
|
||||
&found);
|
||||
@@ -1296,12 +1306,44 @@ PagestoreShmemInit(void)
|
||||
memset(&pagestore_shared->shard_map, 0, sizeof(ShardMap));
|
||||
AssignPageserverConnstring(page_server_connstring, NULL);
|
||||
}
|
||||
|
||||
NeonPerfCountersShmemInit();
|
||||
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
return found;
|
||||
}
|
||||
|
||||
void
|
||||
PagestoreShmemRequest(void)
|
||||
static void
|
||||
pagestore_shmem_startup_hook(void)
|
||||
{
|
||||
RequestAddinShmemSpace(sizeof(PagestoreShmemState));
|
||||
if (prev_shmem_startup_hook)
|
||||
prev_shmem_startup_hook();
|
||||
|
||||
PagestoreShmemInit();
|
||||
}
|
||||
|
||||
static void
|
||||
pagestore_shmem_request(void)
|
||||
{
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
if (prev_shmem_request_hook)
|
||||
prev_shmem_request_hook();
|
||||
#endif
|
||||
|
||||
RequestAddinShmemSpace(PagestoreShmemSize());
|
||||
}
|
||||
|
||||
static void
|
||||
pagestore_prepare_shmem(void)
|
||||
{
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
prev_shmem_request_hook = shmem_request_hook;
|
||||
shmem_request_hook = pagestore_shmem_request;
|
||||
#else
|
||||
pagestore_shmem_request();
|
||||
#endif
|
||||
prev_shmem_startup_hook = shmem_startup_hook;
|
||||
shmem_startup_hook = pagestore_shmem_startup_hook;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1310,6 +1352,8 @@ PagestoreShmemRequest(void)
|
||||
void
|
||||
pg_init_libpagestore(void)
|
||||
{
|
||||
pagestore_prepare_shmem();
|
||||
|
||||
DefineCustomStringVariable("neon.pageserver_connstring",
|
||||
"connection string to the page server",
|
||||
NULL,
|
||||
@@ -1460,6 +1504,8 @@ pg_init_libpagestore(void)
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
relsize_hash_init();
|
||||
|
||||
if (page_server != NULL)
|
||||
neon_log(ERROR, "libpagestore already loaded");
|
||||
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
create function neon_communicator_min_inflight_request_lsn() returns pg_catalog.pg_lsn
|
||||
AS 'MODULE_PATHNAME', 'neon_communicator_min_inflight_request_lsn'
|
||||
LANGUAGE C;
|
||||
@@ -1 +0,0 @@
|
||||
drop function neon_communicator_min_inflight_request_lsn();
|
||||
@@ -22,7 +22,6 @@
|
||||
#include "replication/slot.h"
|
||||
#include "replication/walsender.h"
|
||||
#include "storage/proc.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "funcapi.h"
|
||||
#include "access/htup_details.h"
|
||||
#include "utils/builtins.h"
|
||||
@@ -60,15 +59,11 @@ static ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
|
||||
static void neon_ExecutorStart(QueryDesc *queryDesc, int eflags);
|
||||
static void neon_ExecutorEnd(QueryDesc *queryDesc);
|
||||
|
||||
#if PG_MAJORVERSION_NUM >= 16
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook;
|
||||
|
||||
static void neon_shmem_startup_hook(void);
|
||||
static void neon_shmem_request_hook(void);
|
||||
|
||||
#if PG_MAJORVERSION_NUM >= 15
|
||||
static shmem_request_hook_type prev_shmem_request_hook = NULL;
|
||||
#endif
|
||||
|
||||
|
||||
#if PG_MAJORVERSION_NUM >= 17
|
||||
uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE;
|
||||
uint32 WAIT_EVENT_NEON_LFC_READ;
|
||||
@@ -455,13 +450,15 @@ _PG_init(void)
|
||||
*/
|
||||
#if PG_VERSION_NUM >= 160000
|
||||
load_file("$libdir/neon_rmgr", false);
|
||||
|
||||
prev_shmem_startup_hook = shmem_startup_hook;
|
||||
shmem_startup_hook = neon_shmem_startup_hook;
|
||||
#endif
|
||||
|
||||
/* dummy call to a Rust function in the communicator library, to check that it works */
|
||||
(void) communicator_dummy(123);
|
||||
|
||||
pg_init_libpagestore();
|
||||
relsize_hash_init();
|
||||
lfc_init();
|
||||
pg_init_walproposer();
|
||||
init_lwlsncache();
|
||||
@@ -555,16 +552,6 @@ _PG_init(void)
|
||||
|
||||
ReportSearchPath();
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
prev_shmem_request_hook = shmem_request_hook;
|
||||
shmem_request_hook = neon_shmem_request_hook;
|
||||
#else
|
||||
neon_shmem_request_hook();
|
||||
#endif
|
||||
prev_shmem_startup_hook = shmem_startup_hook;
|
||||
shmem_startup_hook = neon_shmem_startup_hook;
|
||||
|
||||
|
||||
prev_ExecutorStart = ExecutorStart_hook;
|
||||
ExecutorStart_hook = neon_ExecutorStart;
|
||||
prev_ExecutorEnd = ExecutorEnd_hook;
|
||||
@@ -650,24 +637,7 @@ approximate_working_set_size(PG_FUNCTION_ARGS)
|
||||
PG_RETURN_INT32(dc);
|
||||
}
|
||||
|
||||
static void
|
||||
neon_shmem_request_hook(void)
|
||||
{
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
if (prev_shmem_request_hook)
|
||||
prev_shmem_request_hook();
|
||||
#endif
|
||||
|
||||
LfcShmemRequest();
|
||||
NeonPerfCountersShmemRequest();
|
||||
PagestoreShmemRequest();
|
||||
RelsizeCacheShmemRequest();
|
||||
CommunicatorShmemRequest();
|
||||
WalproposerShmemRequest();
|
||||
LwLsnCacheShmemRequest();
|
||||
}
|
||||
|
||||
|
||||
#if PG_MAJORVERSION_NUM >= 16
|
||||
static void
|
||||
neon_shmem_startup_hook(void)
|
||||
{
|
||||
@@ -675,16 +645,6 @@ neon_shmem_startup_hook(void)
|
||||
if (prev_shmem_startup_hook)
|
||||
prev_shmem_startup_hook();
|
||||
|
||||
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
|
||||
|
||||
LfcShmemInit();
|
||||
NeonPerfCountersShmemInit();
|
||||
PagestoreShmemInit();
|
||||
RelsizeCacheShmemInit();
|
||||
CommunicatorShmemInit();
|
||||
WalproposerShmemInit();
|
||||
LwLsnCacheShmemInit();
|
||||
|
||||
#if PG_MAJORVERSION_NUM >= 17
|
||||
WAIT_EVENT_NEON_LFC_MAINTENANCE = WaitEventExtensionNew("Neon/FileCache_Maintenance");
|
||||
WAIT_EVENT_NEON_LFC_READ = WaitEventExtensionNew("Neon/FileCache_Read");
|
||||
@@ -697,9 +657,8 @@ neon_shmem_startup_hook(void)
|
||||
WAIT_EVENT_NEON_PS_READ = WaitEventExtensionNew("Neon/PS_ReadIO");
|
||||
WAIT_EVENT_NEON_WAL_DL = WaitEventExtensionNew("Neon/WAL_Download");
|
||||
#endif
|
||||
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* ExecutorStart hook: start up tracking if needed
|
||||
|
||||
@@ -58,7 +58,6 @@ extern uint32 WAIT_EVENT_NEON_WAL_DL;
|
||||
(errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \
|
||||
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
|
||||
|
||||
#define InfiniteXLogRecPtr UINT64_MAX
|
||||
|
||||
extern void pg_init_libpagestore(void);
|
||||
extern void pg_init_walproposer(void);
|
||||
@@ -72,21 +71,4 @@ extern PGDLLEXPORT void WalProposerSync(int argc, char *argv[]);
|
||||
extern PGDLLEXPORT void WalProposerMain(Datum main_arg);
|
||||
extern PGDLLEXPORT void LogicalSlotsMonitorMain(Datum main_arg);
|
||||
|
||||
extern void LfcShmemRequest(void);
|
||||
extern void PagestoreShmemRequest(void);
|
||||
extern void RelsizeCacheShmemRequest(void);
|
||||
extern void CommunicatorShmemRequest(void);
|
||||
extern void WalproposerShmemRequest(void);
|
||||
extern void LwLsnCacheShmemRequest(void);
|
||||
extern void NeonPerfCountersShmemRequest(void);
|
||||
|
||||
extern void LfcShmemInit(void);
|
||||
extern void PagestoreShmemInit(void);
|
||||
extern void RelsizeCacheShmemInit(void);
|
||||
extern void CommunicatorShmemInit(void);
|
||||
extern void WalproposerShmemInit(void);
|
||||
extern void LwLsnCacheShmemInit(void);
|
||||
extern void NeonPerfCountersShmemInit(void);
|
||||
|
||||
|
||||
#endif /* NEON_H */
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
#include "postgres.h"
|
||||
|
||||
#include "neon.h"
|
||||
#include "neon_lwlsncache.h"
|
||||
|
||||
#include "miscadmin.h"
|
||||
@@ -82,6 +81,14 @@ static set_max_lwlsn_hook_type prev_set_max_lwlsn_hook = NULL;
|
||||
static set_lwlsn_relation_hook_type prev_set_lwlsn_relation_hook = NULL;
|
||||
static set_lwlsn_db_hook_type prev_set_lwlsn_db_hook = NULL;
|
||||
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook;
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
static shmem_request_hook_type prev_shmem_request_hook;
|
||||
#endif
|
||||
|
||||
static void shmemrequest(void);
|
||||
static void shmeminit(void);
|
||||
static void neon_set_max_lwlsn(XLogRecPtr lsn);
|
||||
|
||||
void
|
||||
@@ -92,6 +99,16 @@ init_lwlsncache(void)
|
||||
|
||||
lwlc_register_gucs();
|
||||
|
||||
prev_shmem_startup_hook = shmem_startup_hook;
|
||||
shmem_startup_hook = shmeminit;
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
prev_shmem_request_hook = shmem_request_hook;
|
||||
shmem_request_hook = shmemrequest;
|
||||
#else
|
||||
shmemrequest();
|
||||
#endif
|
||||
|
||||
prev_set_lwlsn_block_range_hook = set_lwlsn_block_range_hook;
|
||||
set_lwlsn_block_range_hook = neon_set_lwlsn_block_range;
|
||||
prev_set_lwlsn_block_v_hook = set_lwlsn_block_v_hook;
|
||||
@@ -107,19 +124,20 @@ init_lwlsncache(void)
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
LwLsnCacheShmemRequest(void)
|
||||
{
|
||||
static void shmemrequest(void) {
|
||||
Size requested_size = sizeof(LwLsnCacheCtl);
|
||||
|
||||
|
||||
requested_size += hash_estimate_size(lwlsn_cache_size, sizeof(LastWrittenLsnCacheEntry));
|
||||
|
||||
RequestAddinShmemSpace(requested_size);
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
if (prev_shmem_request_hook)
|
||||
prev_shmem_request_hook();
|
||||
#endif
|
||||
}
|
||||
|
||||
void
|
||||
LwLsnCacheShmemInit(void)
|
||||
{
|
||||
static void shmeminit(void) {
|
||||
static HASHCTL info;
|
||||
bool found;
|
||||
if (lwlsn_cache_size > 0)
|
||||
@@ -139,6 +157,9 @@ LwLsnCacheShmemInit(void)
|
||||
}
|
||||
dlist_init(&LwLsnCache->lastWrittenLsnLRU);
|
||||
LwLsnCache->maxLastWrittenLsn = GetRedoRecPtr();
|
||||
if (prev_shmem_startup_hook) {
|
||||
prev_shmem_startup_hook();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
@@ -17,21 +17,22 @@
|
||||
#include "storage/shmem.h"
|
||||
#include "utils/builtins.h"
|
||||
|
||||
#include "neon.h"
|
||||
#include "neon_perf_counters.h"
|
||||
#include "neon_pgversioncompat.h"
|
||||
|
||||
neon_per_backend_counters *neon_per_backend_counters_shared;
|
||||
|
||||
void
|
||||
NeonPerfCountersShmemRequest(void)
|
||||
Size
|
||||
NeonPerfCountersShmemSize(void)
|
||||
{
|
||||
Size size = mul_size(NUM_NEON_PERF_COUNTER_SLOTS, sizeof(neon_per_backend_counters));
|
||||
RequestAddinShmemSpace(size);
|
||||
Size size = 0;
|
||||
|
||||
size = add_size(size, mul_size(NUM_NEON_PERF_COUNTER_SLOTS,
|
||||
sizeof(neon_per_backend_counters)));
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
|
||||
|
||||
void
|
||||
NeonPerfCountersShmemInit(void)
|
||||
{
|
||||
|
||||
@@ -250,6 +250,7 @@ extern const f_smgr *smgr_neon(ProcNumber backend, NRelFileInfo rinfo);
|
||||
extern void smgr_init_neon(void);
|
||||
extern void readahead_buffer_resize(int newsize, void *extra);
|
||||
|
||||
|
||||
/*
|
||||
* LSN values associated with each request to the pageserver
|
||||
*/
|
||||
|
||||
@@ -675,7 +675,7 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
* always have that problem as the can always lag behind the
|
||||
* primary, but for the primary we can avoid it by always
|
||||
* requesting the latest page, by setting request LSN to
|
||||
* InfiniteXLogRecPtr.
|
||||
* UINT64_MAX.
|
||||
*
|
||||
* effective_request_lsn is used to check that received response is still valid.
|
||||
* In case of primary node it is last written LSN. Originally we used flush_lsn here,
|
||||
@@ -703,7 +703,7 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
* The problem can be fixed by callingGetFlushRecPtr() before checking if the page is in the buffer cache.
|
||||
* But you can't do that within smgrprefetch(), would need to modify the caller.
|
||||
*/
|
||||
result->request_lsn = InfiniteXLogRecPtr;
|
||||
result->request_lsn = UINT64_MAX;
|
||||
result->not_modified_since = last_written_lsn;
|
||||
result->effective_request_lsn = last_written_lsn;
|
||||
}
|
||||
@@ -2158,7 +2158,7 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
|
||||
request_lsn = nm_adjust_lsn(request_lsn);
|
||||
}
|
||||
else
|
||||
request_lsn = InfiniteXLogRecPtr;
|
||||
request_lsn = UINT64_MAX;
|
||||
|
||||
/*
|
||||
* GetRedoStartLsn() returns LSN of the basebackup. We know that the SLRU
|
||||
|
||||
@@ -10,7 +10,6 @@
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#include "neon.h"
|
||||
#include "neon_pgversioncompat.h"
|
||||
|
||||
#include "pagestore_client.h"
|
||||
@@ -50,23 +49,32 @@ typedef struct
|
||||
* algorithm */
|
||||
} RelSizeHashControl;
|
||||
|
||||
static HTAB *relsize_hash;
|
||||
static LWLockId relsize_lock;
|
||||
static int relsize_hash_size;
|
||||
static RelSizeHashControl* relsize_ctl;
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
static shmem_request_hook_type prev_shmem_request_hook = NULL;
|
||||
static void relsize_shmem_request(void);
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Size of a cache entry is 36 bytes. So this default will take about 2.3 MB,
|
||||
* which seems reasonable.
|
||||
*/
|
||||
#define DEFAULT_RELSIZE_HASH_SIZE (64 * 1024)
|
||||
|
||||
static HTAB *relsize_hash;
|
||||
static LWLockId relsize_lock;
|
||||
static int relsize_hash_size = DEFAULT_RELSIZE_HASH_SIZE;
|
||||
static RelSizeHashControl* relsize_ctl;
|
||||
|
||||
void
|
||||
RelsizeCacheShmemInit(void)
|
||||
static void
|
||||
neon_smgr_shmem_startup(void)
|
||||
{
|
||||
static HASHCTL info;
|
||||
bool found;
|
||||
|
||||
if (prev_shmem_startup_hook)
|
||||
prev_shmem_startup_hook();
|
||||
|
||||
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
|
||||
relsize_ctl = (RelSizeHashControl *) ShmemInitStruct("relsize_hash", sizeof(RelSizeHashControl), &found);
|
||||
if (!found)
|
||||
{
|
||||
@@ -77,6 +85,7 @@ RelsizeCacheShmemInit(void)
|
||||
relsize_hash_size, relsize_hash_size,
|
||||
&info,
|
||||
HASH_ELEM | HASH_BLOBS);
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
relsize_ctl->size = 0;
|
||||
relsize_ctl->hits = 0;
|
||||
relsize_ctl->misses = 0;
|
||||
@@ -233,15 +242,34 @@ relsize_hash_init(void)
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
if (relsize_hash_size > 0)
|
||||
{
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
prev_shmem_request_hook = shmem_request_hook;
|
||||
shmem_request_hook = relsize_shmem_request;
|
||||
#else
|
||||
RequestAddinShmemSpace(hash_estimate_size(relsize_hash_size, sizeof(RelSizeEntry)));
|
||||
RequestNamedLWLockTranche("neon_relsize", 1);
|
||||
#endif
|
||||
|
||||
prev_shmem_startup_hook = shmem_startup_hook;
|
||||
shmem_startup_hook = neon_smgr_shmem_startup;
|
||||
}
|
||||
}
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
/*
|
||||
* shmem_request hook: request additional shared resources. We'll allocate or
|
||||
* attach to the shared resources in neon_smgr_shmem_startup().
|
||||
*/
|
||||
void
|
||||
RelsizeCacheShmemRequest(void)
|
||||
static void
|
||||
relsize_shmem_request(void)
|
||||
{
|
||||
if (prev_shmem_request_hook)
|
||||
prev_shmem_request_hook();
|
||||
|
||||
RequestAddinShmemSpace(sizeof(RelSizeHashControl) + hash_estimate_size(relsize_hash_size, sizeof(RelSizeEntry)));
|
||||
RequestNamedLWLockTranche("neon_relsize", 1);
|
||||
}
|
||||
#endif
|
||||
|
||||
@@ -83,8 +83,10 @@ static XLogRecPtr standby_flush_lsn = InvalidXLogRecPtr;
|
||||
static XLogRecPtr standby_apply_lsn = InvalidXLogRecPtr;
|
||||
static HotStandbyFeedback agg_hs_feedback;
|
||||
|
||||
static void nwp_shmem_startup_hook(void);
|
||||
static void nwp_register_gucs(void);
|
||||
static void assign_neon_safekeepers(const char *newval, void *extra);
|
||||
static void nwp_prepare_shmem(void);
|
||||
static uint64 backpressure_lag_impl(void);
|
||||
static uint64 startup_backpressure_wrap(void);
|
||||
static bool backpressure_throttling_impl(void);
|
||||
@@ -97,6 +99,11 @@ static TimestampTz walprop_pg_get_current_timestamp(WalProposer *wp);
|
||||
static void walprop_pg_load_libpqwalreceiver(void);
|
||||
|
||||
static process_interrupts_callback_t PrevProcessInterruptsCallback = NULL;
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook_type;
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
static shmem_request_hook_type prev_shmem_request_hook = NULL;
|
||||
static void walproposer_shmem_request(void);
|
||||
#endif
|
||||
static void WalproposerShmemInit_SyncSafekeeper(void);
|
||||
|
||||
|
||||
@@ -186,6 +193,8 @@ pg_init_walproposer(void)
|
||||
|
||||
nwp_register_gucs();
|
||||
|
||||
nwp_prepare_shmem();
|
||||
|
||||
delay_backend_us = &startup_backpressure_wrap;
|
||||
PrevProcessInterruptsCallback = ProcessInterruptsCallback;
|
||||
ProcessInterruptsCallback = backpressure_throttling_impl;
|
||||
@@ -473,11 +482,12 @@ WalproposerShmemSize(void)
|
||||
return sizeof(WalproposerShmemState);
|
||||
}
|
||||
|
||||
void
|
||||
static bool
|
||||
WalproposerShmemInit(void)
|
||||
{
|
||||
bool found;
|
||||
|
||||
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
|
||||
walprop_shared = ShmemInitStruct("Walproposer shared state",
|
||||
sizeof(WalproposerShmemState),
|
||||
&found);
|
||||
@@ -494,6 +504,9 @@ WalproposerShmemInit(void)
|
||||
pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0);
|
||||
/* END_HADRON */
|
||||
}
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
|
||||
return found;
|
||||
}
|
||||
|
||||
static void
|
||||
@@ -596,15 +609,42 @@ walprop_register_bgworker(void)
|
||||
|
||||
/* shmem handling */
|
||||
|
||||
static void
|
||||
nwp_prepare_shmem(void)
|
||||
{
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
prev_shmem_request_hook = shmem_request_hook;
|
||||
shmem_request_hook = walproposer_shmem_request;
|
||||
#else
|
||||
RequestAddinShmemSpace(WalproposerShmemSize());
|
||||
#endif
|
||||
prev_shmem_startup_hook_type = shmem_startup_hook;
|
||||
shmem_startup_hook = nwp_shmem_startup_hook;
|
||||
}
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
/*
|
||||
* shmem_request hook: request additional shared resources. We'll allocate or
|
||||
* attach to the shared resources in WalproposerShmemInit().
|
||||
* attach to the shared resources in nwp_shmem_startup_hook().
|
||||
*/
|
||||
void
|
||||
WalproposerShmemRequest(void)
|
||||
static void
|
||||
walproposer_shmem_request(void)
|
||||
{
|
||||
if (prev_shmem_request_hook)
|
||||
prev_shmem_request_hook();
|
||||
|
||||
RequestAddinShmemSpace(WalproposerShmemSize());
|
||||
}
|
||||
#endif
|
||||
|
||||
static void
|
||||
nwp_shmem_startup_hook(void)
|
||||
{
|
||||
if (prev_shmem_startup_hook_type)
|
||||
prev_shmem_startup_hook_type();
|
||||
|
||||
WalproposerShmemInit();
|
||||
}
|
||||
|
||||
WalproposerShmemState *
|
||||
GetWalpropShmemState(void)
|
||||
|
||||
@@ -120,6 +120,7 @@ workspace_hack.workspace = true
|
||||
[dev-dependencies]
|
||||
assert-json-diff.workspace = true
|
||||
camino-tempfile.workspace = true
|
||||
criterion.workspace = true
|
||||
fallible-iterator.workspace = true
|
||||
flate2.workspace = true
|
||||
tokio-tungstenite.workspace = true
|
||||
@@ -130,3 +131,8 @@ walkdir.workspace = true
|
||||
rand_distr = "0.4"
|
||||
tokio-postgres.workspace = true
|
||||
tracing-test = "0.2"
|
||||
|
||||
[[bench]]
|
||||
name = "logging"
|
||||
harness = false
|
||||
|
||||
|
||||
127
proxy/benches/logging.rs
Normal file
127
proxy/benches/logging.rs
Normal file
@@ -0,0 +1,127 @@
|
||||
use std::io;
|
||||
|
||||
use criterion::{Criterion, criterion_group, criterion_main};
|
||||
use proxy::logging::{Clock, JsonLoggingLayer};
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
struct DevNullWriter;
|
||||
|
||||
impl proxy::logging::MakeWriter for DevNullWriter {
|
||||
fn make_writer(&self) -> impl io::Write {
|
||||
DevNullWriter
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for DevNullWriter {
|
||||
type Writer = DevNullWriter;
|
||||
fn make_writer(&'a self) -> Self::Writer {
|
||||
DevNullWriter
|
||||
}
|
||||
}
|
||||
|
||||
impl io::Write for DevNullWriter {
|
||||
#[inline]
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
Ok(criterion::black_box(buf).len())
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct FixedClock;
|
||||
|
||||
impl Clock for FixedClock {
|
||||
fn now(&self) -> chrono::DateTime<chrono::Utc> {
|
||||
const { chrono::DateTime::from_timestamp_nanos(1747859990_000_000_000).to_utc() }
|
||||
}
|
||||
}
|
||||
|
||||
pub fn bench_logging(c: &mut Criterion) {
|
||||
c.bench_function("text fmt current", |b| {
|
||||
let registry = tracing_subscriber::Registry::default().with(
|
||||
tracing_subscriber::fmt::layer()
|
||||
.with_ansi(false)
|
||||
.with_target(false)
|
||||
.with_writer(DevNullWriter),
|
||||
);
|
||||
|
||||
tracing::subscriber::with_default(registry, || {
|
||||
tracing::info_span!("span1", a = 42, b = true, c = "string").in_scope(|| {
|
||||
tracing::info_span!("span2", a = 42, b = true, c = "string").in_scope(|| {
|
||||
b.iter(|| {
|
||||
tracing::error!(a = 42, b = true, c = "string", "message field");
|
||||
})
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
c.bench_function("text fmt full", |b| {
|
||||
let registry = tracing_subscriber::Registry::default().with(
|
||||
tracing_subscriber::fmt::layer()
|
||||
.with_level(true)
|
||||
.with_file(true)
|
||||
.with_line_number(true)
|
||||
.with_target(true)
|
||||
.with_thread_ids(true)
|
||||
.with_writer(DevNullWriter),
|
||||
);
|
||||
|
||||
tracing::subscriber::with_default(registry, || {
|
||||
tracing::info_span!("span1", a = 42, b = true, c = "string").in_scope(|| {
|
||||
tracing::info_span!("span2", a = 42, b = true, c = "string").in_scope(|| {
|
||||
b.iter(|| {
|
||||
tracing::error!(a = 42, b = true, c = "string", "message field");
|
||||
})
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
c.bench_function("json fmt", |b| {
|
||||
let registry = tracing_subscriber::Registry::default().with(
|
||||
tracing_subscriber::fmt::layer()
|
||||
.with_level(true)
|
||||
.with_file(true)
|
||||
.with_line_number(true)
|
||||
.with_target(true)
|
||||
.with_thread_ids(true)
|
||||
.with_writer(DevNullWriter)
|
||||
.json(),
|
||||
);
|
||||
|
||||
tracing::subscriber::with_default(registry, || {
|
||||
tracing::info_span!("span1", a = 42, b = true, c = "string").in_scope(|| {
|
||||
tracing::info_span!("span2", a = 42, b = true, c = "string").in_scope(|| {
|
||||
b.iter(|| {
|
||||
tracing::error!(a = 42, b = true, c = "string", "message field");
|
||||
})
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
c.bench_function("json custom", |b| {
|
||||
let registry = tracing_subscriber::Registry::default().with(JsonLoggingLayer::new(
|
||||
FixedClock,
|
||||
DevNullWriter,
|
||||
&["a"],
|
||||
));
|
||||
|
||||
tracing::subscriber::with_default(registry, || {
|
||||
tracing::info_span!("span1", a = 42, b = true, c = "string").in_scope(|| {
|
||||
tracing::info_span!("span2", a = 42, b = true, c = "string").in_scope(|| {
|
||||
b.iter(|| {
|
||||
tracing::error!(a = 42, b = true, c = "string", "message field");
|
||||
})
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
criterion_group!(benches, bench_logging);
|
||||
criterion_main!(benches);
|
||||
@@ -93,7 +93,7 @@ mod ext;
|
||||
mod http;
|
||||
mod intern;
|
||||
mod jemalloc;
|
||||
mod logging;
|
||||
pub mod logging;
|
||||
mod metrics;
|
||||
mod parse;
|
||||
mod pglb;
|
||||
|
||||
@@ -148,11 +148,11 @@ impl LogFormat {
|
||||
}
|
||||
}
|
||||
|
||||
trait MakeWriter {
|
||||
pub trait MakeWriter {
|
||||
fn make_writer(&self) -> impl io::Write;
|
||||
}
|
||||
|
||||
struct StderrWriter {
|
||||
pub struct StderrWriter {
|
||||
stderr: io::Stderr,
|
||||
}
|
||||
|
||||
@@ -164,11 +164,11 @@ impl MakeWriter for StderrWriter {
|
||||
}
|
||||
|
||||
// TODO: move into separate module or even separate crate.
|
||||
trait Clock {
|
||||
pub trait Clock {
|
||||
fn now(&self) -> DateTime<Utc>;
|
||||
}
|
||||
|
||||
struct RealClock;
|
||||
pub struct RealClock;
|
||||
|
||||
impl Clock for RealClock {
|
||||
#[inline]
|
||||
@@ -203,7 +203,7 @@ type CallsiteMap<T> =
|
||||
papaya::HashMap<callsite::Identifier, T, std::hash::BuildHasherDefault<rustc_hash::FxHasher>>;
|
||||
|
||||
/// Implements tracing layer to handle events specific to logging.
|
||||
struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
|
||||
pub struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
|
||||
clock: C,
|
||||
writer: W,
|
||||
|
||||
@@ -217,7 +217,7 @@ struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
|
||||
}
|
||||
|
||||
impl<C: Clock, W: MakeWriter> JsonLoggingLayer<C, W> {
|
||||
fn new(clock: C, writer: W, extract_fields: &'static [&'static str]) -> Self {
|
||||
pub fn new(clock: C, writer: W, extract_fields: &'static [&'static str]) -> Self {
|
||||
JsonLoggingLayer {
|
||||
clock,
|
||||
skipped_field_indices: CallsiteMap::default(),
|
||||
|
||||
@@ -110,7 +110,7 @@ where
|
||||
debug!(error = ?err, COULD_NOT_CONNECT);
|
||||
|
||||
let node_info = if !node_info.cached() || !err.should_retry_wake_compute() {
|
||||
// If we just recieved this from cplane and didn't get it from cache, we shouldn't retry.
|
||||
// If we just received this from cplane and not from the cache, we shouldn't retry.
|
||||
// Do not need to retrieve a new node_info, just return the old one.
|
||||
if !should_retry(&err, num_retries, compute.retry) {
|
||||
Metrics::get().proxy.retries_metric.observe(
|
||||
|
||||
@@ -58,6 +58,7 @@ metrics.workspace = true
|
||||
pem.workspace = true
|
||||
postgres_backend.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
postgres_ffi_types.workspace = true
|
||||
postgres_versioninfo.workspace = true
|
||||
pq_proto.workspace = true
|
||||
remote_storage.workspace = true
|
||||
|
||||
@@ -12,7 +12,8 @@ use futures::FutureExt;
|
||||
use itertools::Itertools;
|
||||
use parking_lot::Mutex;
|
||||
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend, PostgresBackendReader, QueryError};
|
||||
use postgres_ffi::{MAX_SEND_SIZE, PgMajorVersion, TimestampTz, get_current_timestamp};
|
||||
use postgres_ffi::{MAX_SEND_SIZE, PgMajorVersion, get_current_timestamp};
|
||||
use postgres_ffi_types::TimestampTz;
|
||||
use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
|
||||
use safekeeper_api::Term;
|
||||
use safekeeper_api::models::{
|
||||
|
||||
@@ -728,7 +728,7 @@ class NeonEnvBuilder:
|
||||
# NB: neon_local rewrites postgresql.conf on each start based on neon_local config. No need to patch it.
|
||||
# However, in this new NeonEnv, the pageservers and safekeepers listen on different ports, and the storage
|
||||
# controller will currently reject re-attach requests from them because the NodeMetadata isn't identical.
|
||||
# So, from_repo_dir patches up the the storcon database.
|
||||
# So, from_repo_dir patches up the storcon database.
|
||||
patch_script_path = self.repo_dir / "storage_controller_db.startup.sql"
|
||||
assert not patch_script_path.exists()
|
||||
patch_script = ""
|
||||
|
||||
@@ -24,10 +24,7 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder):
|
||||
[
|
||||
".*get_values_reconstruct_data for layer .*",
|
||||
".*could not find data for key.*",
|
||||
".*is not active. Current state: Broken.*",
|
||||
".*will not become active. Current state: Broken.*",
|
||||
".*failed to load metadata.*",
|
||||
".*load failed.*load local timeline.*",
|
||||
".*: layer load failed, assuming permanent failure:.*",
|
||||
".*failed to get checkpoint bytes.*",
|
||||
".*failed to get control bytes.*",
|
||||
|
||||
@@ -687,7 +687,7 @@ def test_sharding_compaction(
|
||||
for _i in range(0, 10):
|
||||
# Each of these does some writes then a checkpoint: because we set image_creation_threshold to 1,
|
||||
# these should result in image layers each time we write some data into a shard, and also shards
|
||||
# recieving less data hitting their "empty image layer" path (wherre they should skip writing the layer,
|
||||
# receiving less data hitting their "empty image layer" path (where they should skip writing the layer,
|
||||
# rather than asserting)
|
||||
workload.churn_rows(64)
|
||||
|
||||
|
||||
@@ -76,7 +76,6 @@ def test_tenants_normal_work(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
"""Tests tenants with and without wal acceptors"""
|
||||
tenant_1, _ = env.create_tenant()
|
||||
tenant_2, _ = env.create_tenant()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user