Compare commits

...

20 Commits

Author SHA1 Message Date
Konstantin Knizhnik
a170e49e91 Bump postgres version 2023-12-15 16:40:44 +02:00
Konstantin Knizhnik
1cae50eacc On demand downloading of SLRU segments 2023-12-15 16:16:50 +02:00
Konstantin Knizhnik
ac17c2f69c Fix problem with stats collector at pg14 2023-12-13 19:24:14 +02:00
Konstantin Knizhnik
322dd3cf00 Add [NEON_SMGR] to all messages produced by Neon exrtension 2023-12-12 15:55:17 +02:00
Konstantin Knizhnik
8e6e4b1112 Add [NEON_SMGR] to all messages produced by Neon exrtension 2023-12-12 15:55:17 +02:00
John Spray
818290b7b6 pgxn: amend key hashing 2023-12-12 15:55:17 +02:00
Konstantin Knizhnik
7effdf448a [see #6052] make connection logging shard-aware 2023-12-12 15:55:17 +02:00
Konstantin Knizhnik
ffc2145cfb Fix shard map reload synchronization 2023-12-12 15:55:17 +02:00
Konstantin Knizhnik
7bad8e80fb Fix shard map reload mechanism 2023-12-12 15:55:17 +02:00
Konstantin Knizhnik
d3f7cf428e Load shard map only at postmaster 2023-12-12 15:55:17 +02:00
Konstantin Knizhnik
0556364f21 Fix comments 2023-12-12 15:55:17 +02:00
Konstantin Knizhnik
3b2e84fddb Do not deop PS connections of config reload if connection strings are not changed 2023-12-12 15:55:17 +02:00
Konstantin Knizhnik
3705049a24 Fix shard hash caclulation 2023-12-12 15:55:17 +02:00
Konstantin Knizhnik
fd65d0eb80 Minor refectoring 2023-12-12 15:55:17 +02:00
Konstantin Knizhnik
df938c889d Add neon.stripe_size 2023-12-12 15:55:17 +02:00
Konstantin Knizhnik
8c1896be4e Undo occsional changed in control_place_connector.c 2023-12-12 15:55:17 +02:00
Konstantin Knizhnik
52cc1f8ede Merge with main 2023-12-12 15:55:17 +02:00
Konstantin Knizhnik
845846695c Load shardmap from postgresql.conf 2023-12-12 15:55:17 +02:00
Konstantin Knizhnik
4274807eda Take in account stripe size when calculating shard hash number 2023-12-12 15:55:17 +02:00
Konstantin Knizhnik
e10515f3af Add support for PS shardoing in compute 2023-12-12 15:55:17 +02:00
15 changed files with 684 additions and 307 deletions

View File

@@ -5,6 +5,7 @@ use std::{
};
use byteorder::{BigEndian, ReadBytesExt};
use postgres_ffi::BLCKSZ;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use strum_macros;
@@ -570,6 +571,7 @@ pub enum PagestreamFeMessage {
Nblocks(PagestreamNblocksRequest),
GetPage(PagestreamGetPageRequest),
DbSize(PagestreamDbSizeRequest),
GetSlruSegment(PagestreamGetSlruSegmentRequest),
}
// Wrapped in libpq CopyData
@@ -579,6 +581,7 @@ pub enum PagestreamBeMessage {
GetPage(PagestreamGetPageResponse),
Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse),
GetSlruSegment(PagestreamGetSlruSegmentResponse),
}
#[derive(Debug, PartialEq, Eq)]
@@ -610,6 +613,14 @@ pub struct PagestreamDbSizeRequest {
pub dbnode: u32,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamGetSlruSegmentRequest {
pub latest: bool,
pub lsn: Lsn,
pub kind: u8,
pub segno: u32,
}
#[derive(Debug)]
pub struct PagestreamExistsResponse {
pub exists: bool,
@@ -625,6 +636,11 @@ pub struct PagestreamGetPageResponse {
pub page: Bytes,
}
#[derive(Debug)]
pub struct PagestreamGetSlruSegmentResponse {
pub segment: Bytes,
}
#[derive(Debug)]
pub struct PagestreamErrorResponse {
pub message: String,
@@ -677,6 +693,14 @@ impl PagestreamFeMessage {
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.dbnode);
}
Self::GetSlruSegment(req) => {
bytes.put_u8(4);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.lsn.0);
bytes.put_u8(req.kind);
bytes.put_u32(req.segno);
}
}
bytes.into()
@@ -727,6 +751,14 @@ impl PagestreamFeMessage {
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
dbnode: body.read_u32::<BigEndian>()?,
})),
4 => Ok(PagestreamFeMessage::GetSlruSegment(
PagestreamGetSlruSegmentRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
kind: body.read_u8()?,
segno: body.read_u32::<BigEndian>()?,
},
)),
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
}
}
@@ -761,6 +793,12 @@ impl PagestreamBeMessage {
bytes.put_u8(104); /* tag from pagestore_client.h */
bytes.put_i64(resp.db_size);
}
Self::GetSlruSegment(resp) => {
bytes.put_u8(105); /* tag from pagestore_client.h */
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
bytes.put(&resp.segment[..]);
}
}
bytes.into()

View File

@@ -108,9 +108,22 @@ impl RelTag {
/// These files are divided into segments, which are divided into
/// pages of the same BLCKSZ as used for relation files.
///
#[derive(Debug, Clone, Copy, Hash, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
#[derive(
Debug,
Clone,
Copy,
strum_macros::FromRepr,
Hash,
Serialize,
Deserialize,
PartialEq,
Eq,
PartialOrd,
Ord,
)]
#[repr(u8)]
pub enum SlruKind {
Clog,
Clog = 0,
MultiXactMembers,
MultiXactOffsets,
}

View File

@@ -139,6 +139,8 @@ where
async fn send_tarball(mut self) -> anyhow::Result<()> {
// TODO include checksum
let on_demand_slru_download = true; // TODO: should it be feature flag, config parameter or whatever else ?
// Create pgdata subdirs structure
for dir in PGDATA_SUBDIRS.iter() {
let header = new_tar_header_dir(dir)?;
@@ -165,19 +167,20 @@ where
.context("could not add config file to basebackup tarball")?;
}
}
// Gather non-relational files from object storage pages.
for kind in [
SlruKind::Clog,
SlruKind::MultiXactOffsets,
SlruKind::MultiXactMembers,
] {
for segno in self
.timeline
.list_slru_segments(kind, self.lsn, self.ctx)
.await?
{
self.add_slru_segment(kind, segno).await?;
if !on_demand_slru_download {
// Gather non-relational files from object storage pages.
for kind in [
SlruKind::Clog,
SlruKind::MultiXactOffsets,
SlruKind::MultiXactMembers,
] {
for segno in self
.timeline
.list_slru_segments(kind, self.lsn, self.ctx)
.await?
{
self.add_slru_segment(kind, segno).await?;
}
}
}

View File

@@ -961,6 +961,7 @@ pub enum SmgrQueryType {
GetRelSize,
GetPageAtLsn,
GetDbSize,
GetSlruSegment,
}
#[derive(Debug)]
@@ -1030,6 +1031,7 @@ mod smgr_query_time_tests {
(GetRelSize, "get_rel_size"),
(GetPageAtLsn, "get_page_at_lsn"),
(GetDbSize, "get_db_size"),
(GetSlruSegment, "get_slru_segment"),
];
for (op, expect) in expect {
let actual: &'static str = op.into();

View File

@@ -19,7 +19,8 @@ use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
PagestreamNblocksRequest, PagestreamNblocksResponse,
PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
PagestreamNblocksResponse,
};
use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, QueryError};
use pq_proto::framed::ConnectionError;
@@ -64,6 +65,7 @@ use crate::tenant::mgr::ShardSelector;
use crate::tenant::Timeline;
use crate::trace::Tracer;
use pageserver_api::reltag::SlruKind;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
@@ -518,6 +520,16 @@ impl PageServerHandler {
span,
)
}
PagestreamFeMessage::GetSlruSegment(req) => {
let _timer = metrics.start_timer(metrics::SmgrQueryType::GetSlruSegment);
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.lsn);
(
self.handle_get_slru_segment_request(&timeline, &req, &ctx)
.instrument(span.clone())
.await,
span,
)
}
};
if let Err(e) = &response {
@@ -862,6 +874,25 @@ impl PageServerHandler {
}))
}
async fn handle_get_slru_segment_request(
&self,
timeline: &Timeline,
req: &PagestreamGetSlruSegmentRequest,
ctx: &RequestContext,
) -> anyhow::Result<PagestreamBeMessage> {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
.await?;
let kind = SlruKind::from_repr(req.kind).ok_or(anyhow::anyhow!("invalid SLRU kind"))?;
let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
Ok(PagestreamBeMessage::GetSlruSegment(
PagestreamGetSlruSegmentResponse { segment },
))
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(?lsn, ?prev_lsn, %full_backup))]
async fn handle_basebackup_request<IO>(

View File

@@ -12,7 +12,7 @@ use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::repository::*;
use crate::walrecord::NeonWalRecord;
use anyhow::Context;
use bytes::{Buf, Bytes};
use bytes::{Buf, Bytes, BytesMut};
use pageserver_api::key::is_rel_block_key;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
@@ -314,6 +314,25 @@ impl Timeline {
}
}
/// Get the whole SLRU segment
pub async fn get_slru_segment(
&self,
kind: SlruKind,
segno: u32,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
let n_blocks = self.get_slru_segment_size(kind, segno, lsn, ctx).await?;
let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
for blkno in 0..n_blocks {
let block = self
.get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
.await?;
segment.extend_from_slice(&block[..BLCKSZ as usize]);
}
Ok(segment.freeze())
}
/// Look up given SLRU page version.
pub async fn get_slru_page_at_lsn(
&self,

View File

@@ -312,13 +312,13 @@ lfc_change_limit_hook(int newval, void *extra)
Assert(victim->access_count == 0);
#ifdef FALLOC_FL_PUNCH_HOLE
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, (off_t) victim->offset * BLOCKS_PER_CHUNK * BLCKSZ, BLOCKS_PER_CHUNK * BLCKSZ) < 0)
elog(LOG, "Failed to punch hole in file: %m");
neon_log(LOG, "Failed to punch hole in file: %m");
#endif
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
lfc_ctl->used -= 1;
}
lfc_ctl->limit = new_size;
elog(DEBUG1, "set local file cache limit to %d", new_size);
neon_log(DEBUG1, "set local file cache limit to %d", new_size);
LWLockRelease(lfc_lock);
}
@@ -331,7 +331,7 @@ lfc_init(void)
* shared_preload_libraries.
*/
if (!process_shared_preload_libraries_in_progress)
elog(ERROR, "Neon module should be loaded via shared_preload_libraries");
neon_log(ERROR, "Neon module should be loaded via shared_preload_libraries");
DefineCustomIntVariable("neon.max_file_cache_size",
@@ -647,7 +647,7 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, const void
Assert(victim->access_count == 0);
entry->offset = victim->offset; /* grab victim's chunk */
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
elog(DEBUG2, "Swap file cache page");
neon_log(DEBUG2, "Swap file cache page");
}
else
{
@@ -850,10 +850,10 @@ local_cache_pages(PG_FUNCTION_ARGS)
* wrong) function definition though.
*/
if (get_call_result_type(fcinfo, NULL, &expected_tupledesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
neon_log(ERROR, "return type must be a row type");
if (expected_tupledesc->natts != NUM_LOCALCACHE_PAGES_ELEM)
elog(ERROR, "incorrect number of output arguments");
neon_log(ERROR, "incorrect number of output arguments");
/* Construct a tuple descriptor for the result rows. */
tupledesc = CreateTemplateTupleDesc(expected_tupledesc->natts);

View File

@@ -18,6 +18,7 @@
#include "fmgr.h"
#include "access/xlog.h"
#include "access/xlogutils.h"
#include "common/hashfn.h"
#include "storage/buf_internals.h"
#include "storage/lwlock.h"
#include "storage/ipc.h"
@@ -36,22 +37,12 @@
#include "neon.h"
#include "walproposer.h"
#include "neon_utils.h"
#include "control_plane_connector.h"
#define PageStoreTrace DEBUG5
#define RECONNECT_INTERVAL_USEC 1000000
bool connected = false;
PGconn *pageserver_conn = NULL;
/*
* WaitEventSet containing:
* - WL_SOCKET_READABLE on pageserver_conn,
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *pageserver_conn_wes = NULL;
/* GUCs */
char *neon_timeline;
char *neon_tenant;
@@ -64,87 +55,176 @@ int flush_every_n_requests = 8;
int n_reconnect_attempts = 0;
int max_reconnect_attempts = 60;
int stripe_size;
#define MAX_PAGESERVER_CONNSTRING_SIZE 256
bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
static bool pageserver_flush(shardno_t shard_no);
static void pageserver_disconnect(shardno_t shard_no);
static void AssignPageserverConnstring(const char *newval, void *extra);
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
typedef struct
{
LWLockId lock;
pg_atomic_uint64 update_counter;
char pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
} PagestoreShmemState;
size_t n_shards;
pg_atomic_uint64 begin_update_counter;
pg_atomic_uint64 end_update_counter;
char shard_connstr[MAX_SHARDS][MAX_PS_CONNSTR_LEN];
} ShardMap;
#if PG_VERSION_NUM >= 150000
static shmem_request_hook_type prev_shmem_request_hook = NULL;
static void walproposer_shmem_request(void);
static ShardMap* shard_map;
static uint64 shard_map_update_counter;
typedef struct
{
/*
* Connection for each shard
*/
PGconn *conn;
/*
* WaitEventSet containing:
* - WL_SOCKET_READABLE on pageserver_conn,
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *wes;
} PageServer;
static PageServer page_servers[MAX_SHARDS];
static shardno_t max_attached_shard_no;
static void
psm_shmem_startup(void)
{
bool found;
if (prev_shmem_startup_hook)
{
prev_shmem_startup_hook();
}
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
shard_map = (ShardMap*)ShmemInitStruct("shard_map", sizeof(ShardMap), &found);
if (!found)
{
shard_map->n_shards = 0;
pg_atomic_init_u64(&shard_map->begin_update_counter, 0);
pg_atomic_init_u64(&shard_map->end_update_counter, 0);
AssignPageserverConnstring(page_server_connstring, NULL);
}
LWLockRelease(AddinShmemInitLock);
}
static void
psm_shmem_request(void)
{
#if PG_VERSION_NUM>=150000
if (prev_shmem_request_hook)
prev_shmem_request_hook();
#endif
static shmem_startup_hook_type prev_shmem_startup_hook;
static PagestoreShmemState *pagestore_shared;
static uint64 pagestore_local_counter = 0;
static char local_pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
static bool pageserver_flush(void);
static void pageserver_disconnect(void);
static bool
PagestoreShmemIsValid()
{
return pagestore_shared && UsedShmemSegAddr;
}
static bool
CheckPageserverConnstring(char **newval, void **extra, GucSource source)
{
return strlen(*newval) < MAX_PAGESERVER_CONNSTRING_SIZE;
RequestAddinShmemSpace(sizeof(ShardMap));
}
static void
AssignPageserverConnstring(const char *newval, void *extra)
psm_init(void)
{
if (!PagestoreShmemIsValid())
return;
LWLockAcquire(pagestore_shared->lock, LW_EXCLUSIVE);
strlcpy(pagestore_shared->pageserver_connstring, newval, MAX_PAGESERVER_CONNSTRING_SIZE);
pg_atomic_fetch_add_u64(&pagestore_shared->update_counter, 1);
LWLockRelease(pagestore_shared->lock);
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = psm_shmem_startup;
#if PG_VERSION_NUM>=150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = psm_shmem_request;
#else
psm_shmem_request();
#endif
}
/*
* Reload page map if needed and return number of shards and connection string for the specified shard
*/
static shardno_t
load_shard_map(shardno_t shard_no, char* connstr)
{
shardno_t n_shards;
uint64 begin_update_counter;
uint64 end_update_counter;
/*
* There is race condition here between backendc and postmaster which can update shard map.
* We recheck update couner after copying connection string to check that configuration was not changed.
*/
do
{
begin_update_counter = pg_atomic_read_u64(&shard_map->begin_update_counter);
end_update_counter = pg_atomic_read_u64(&shard_map->end_update_counter);
n_shards = shard_map->n_shards;
if (shard_no >= n_shards)
neon_log(ERROR, "Shard %d is greater or equal than number of shards %d", shard_no, n_shards);
if (connstr)
strncpy(connstr, shard_map->shard_connstr[shard_no], MAX_PS_CONNSTR_LEN);
}
while (begin_update_counter != end_update_counter
|| begin_update_counter != pg_atomic_read_u64(&shard_map->begin_update_counter)
|| end_update_counter != pg_atomic_read_u64(&shard_map->end_update_counter));
if (shard_map_update_counter != end_update_counter)
{
/* Reset all connections if connection strings are changed */
for (shardno_t i = 0; i < max_attached_shard_no; i++)
{
if (page_servers[i].conn)
pageserver_disconnect(i);
}
max_attached_shard_no = 0;
shard_map_update_counter = end_update_counter;
}
return n_shards;
}
#define MB (1024*1024)
shardno_t
get_shard_number(BufferTag* tag)
{
shardno_t n_shards = load_shard_map(0, NULL);
uint32 hash;
#if PG_MAJORVERSION_NUM < 16
hash = murmurhash32(tag->rnode.relNode);
hash = hash_combine(hash, murmurhash32(tag->blockNum/(MB/BLCKSZ)/stripe_size));
#else
hash = murmurhash32(tag->relNumber);
hash = hash_combine(hash, murmurhash32(tag->blockNum/(MB/BLCKSZ)/stripe_size));
#endif
return hash % n_shards;
}
static bool
CheckConnstringUpdated()
{
if (!PagestoreShmemIsValid())
return false;
return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->update_counter);
}
static void
ReloadConnstring()
{
if (!PagestoreShmemIsValid())
return;
LWLockAcquire(pagestore_shared->lock, LW_SHARED);
strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring));
pagestore_local_counter = pg_atomic_read_u64(&pagestore_shared->update_counter);
LWLockRelease(pagestore_shared->lock);
}
static bool
pageserver_connect(int elevel)
pageserver_connect(shardno_t shard_no, int elevel)
{
char *query;
int ret;
const char *keywords[3];
const char *values[3];
int n;
PGconn* conn;
WaitEventSet *wes;
char connstr[MAX_PS_CONNSTR_LEN];
Assert(!connected);
Assert(page_servers[shard_no].conn == NULL);
if (CheckConnstringUpdated())
{
ReloadConnstring();
}
(void)load_shard_map(shard_no, connstr); /* refresh page map if needed */
/*
* Connect using the connection string we got from the
@@ -164,50 +244,47 @@ pageserver_connect(int elevel)
n++;
}
keywords[n] = "dbname";
values[n] = local_pageserver_connstring;
values[n] = connstr;
n++;
keywords[n] = NULL;
values[n] = NULL;
n++;
pageserver_conn = PQconnectdbParams(keywords, values, 1);
conn = PQconnectdbParams(keywords, values, 1);
if (PQstatus(pageserver_conn) == CONNECTION_BAD)
if (PQstatus(conn) == CONNECTION_BAD)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
char *msg = pchomp(PQerrorMessage(conn));
PQfinish(pageserver_conn);
pageserver_conn = NULL;
PQfinish(conn);
ereport(elevel,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg(NEON_TAG "could not establish connection to pageserver"),
errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", shard_no),
errdetail_internal("%s", msg)));
return false;
}
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
ret = PQsendQuery(pageserver_conn, query);
ret = PQsendQuery(conn, query);
if (ret != 1)
{
PQfinish(pageserver_conn);
pageserver_conn = NULL;
neon_log(elevel, "could not send pagestream command to pageserver");
PQfinish(conn);
neon_shard_log(shard_no, elevel, "could not send pagestream command to pageserver");
return false;
}
pageserver_conn_wes = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(pageserver_conn_wes, WL_LATCH_SET, PGINVALID_SOCKET,
wes = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
AddWaitEventToSet(wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_SOCKET_READABLE, PQsocket(pageserver_conn), NULL, NULL);
AddWaitEventToSet(wes, WL_SOCKET_READABLE, PQsocket(conn), NULL, NULL);
while (PQisBusy(pageserver_conn))
while (PQisBusy(conn))
{
WaitEvent event;
/* Sleep until there's something to do */
(void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
(void) WaitEventSetWait(wes, -1L, &event, 1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
@@ -215,25 +292,25 @@ pageserver_connect(int elevel)
/* Data available in socket? */
if (event.events & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(pageserver_conn))
if (!PQconsumeInput(conn))
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
char *msg = pchomp(PQerrorMessage(conn));
PQfinish(pageserver_conn);
pageserver_conn = NULL;
FreeWaitEventSet(pageserver_conn_wes);
pageserver_conn_wes = NULL;
PQfinish(conn);
FreeWaitEventSet(wes);
neon_log(elevel, "could not complete handshake with pageserver: %s",
msg);
neon_shard_log(shard_no, elevel, "could not complete handshake with pageserver: %s",
msg);
return false;
}
}
}
neon_log(LOG, "libpagestore: connected to '%s'", page_server_connstring);
neon_shard_log(shard_no, LOG, "libpagestore: connected to '%s'", connstr);
page_servers[shard_no].conn = conn;
page_servers[shard_no].wes = wes;
max_attached_shard_no = Max(shard_no+1, max_attached_shard_no);
connected = true;
return true;
}
@@ -241,10 +318,10 @@ pageserver_connect(int elevel)
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
*/
static int
call_PQgetCopyData(char **buffer)
call_PQgetCopyData(shardno_t shard_no, char **buffer)
{
int ret;
PGconn* pageserver_conn = page_servers[shard_no].conn;
retry:
ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ );
@@ -253,7 +330,7 @@ retry:
WaitEvent event;
/* Sleep until there's something to do */
(void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
(void) WaitEventSetWait(page_servers[shard_no].wes, -1L, &event, 1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
@@ -265,7 +342,7 @@ retry:
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
neon_log(LOG, "could not get response from pageserver: %s", msg);
neon_shard_log(shard_no, LOG, "could not get response from pageserver: %s", msg);
pfree(msg);
return -1;
}
@@ -279,7 +356,7 @@ retry:
static void
pageserver_disconnect(void)
pageserver_disconnect(shardno_t shard_no)
{
/*
* If anything goes wrong while we were sending a request, it's not clear
@@ -288,38 +365,32 @@ pageserver_disconnect(void)
* time later after we have already sent a new unrelated request. Close
* the connection to avoid getting confused.
*/
if (connected)
if (page_servers[shard_no].conn)
{
neon_log(LOG, "dropping connection to page server due to error");
PQfinish(pageserver_conn);
pageserver_conn = NULL;
connected = false;
neon_shard_log(shard_no, LOG, "dropping connection to page server due to error");
PQfinish(page_servers[shard_no].conn);
page_servers[shard_no].conn = NULL;
prefetch_on_ps_disconnect();
}
if (pageserver_conn_wes != NULL)
if (page_servers[shard_no].wes != NULL)
{
FreeWaitEventSet(pageserver_conn_wes);
pageserver_conn_wes = NULL;
FreeWaitEventSet(page_servers[shard_no].wes);
page_servers[shard_no].wes = NULL;
}
}
static bool
pageserver_send(NeonRequest *request)
pageserver_send(shardno_t shard_no, NeonRequest *request)
{
StringInfoData req_buff;
if (CheckConnstringUpdated())
{
pageserver_disconnect();
ReloadConnstring();
}
PGconn* pageserver_conn = page_servers[shard_no].conn;
/* If the connection was lost for some reason, reconnect */
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
if (pageserver_conn && PQstatus(pageserver_conn) == CONNECTION_BAD)
{
neon_log(LOG, "pageserver_send disconnect bad connection");
pageserver_disconnect();
neon_shard_log(shard_no, LOG, "pageserver_send disconnect bad connection");
pageserver_disconnect(shard_no);
}
req_buff = nm_pack_request(request);
@@ -333,9 +404,9 @@ pageserver_send(NeonRequest *request)
* https://github.com/neondatabase/neon/issues/1138 So try to reestablish
* connection in case of failure.
*/
if (!connected)
if (!page_servers[shard_no].conn)
{
while (!pageserver_connect(n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
while (!pageserver_connect(shard_no, n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
{
HandleMainLoopInterrupts();
n_reconnect_attempts += 1;
@@ -344,7 +415,9 @@ pageserver_send(NeonRequest *request)
n_reconnect_attempts = 0;
}
/*
pageserver_conn = page_servers[shard_no].conn;
/*
* Send request.
*
* In principle, this could block if the output buffer is full, and we
@@ -355,9 +428,8 @@ pageserver_send(NeonRequest *request)
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect();
neon_log(LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg);
pageserver_disconnect(shard_no);
neon_shard_log(shard_no, LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg);
pfree(msg);
pfree(req_buff.data);
return false;
@@ -369,19 +441,19 @@ pageserver_send(NeonRequest *request)
{
char *msg = nm_to_string((NeonMessage *) request);
neon_log(PageStoreTrace, "sent request: %s", msg);
neon_shard_log(shard_no, PageStoreTrace, "sent request: %s", msg);
pfree(msg);
}
return true;
}
static NeonResponse *
pageserver_receive(void)
pageserver_receive(shardno_t shard_no)
{
StringInfoData resp_buff;
NeonResponse *resp;
if (!connected)
PGconn* pageserver_conn = page_servers[shard_no].conn;
if (!pageserver_conn)
return NULL;
PG_TRY();
@@ -389,7 +461,7 @@ pageserver_receive(void)
/* read response */
int rc;
rc = call_PQgetCopyData(&resp_buff.data);
rc = call_PQgetCopyData(shard_no, &resp_buff.data);
if (rc >= 0)
{
resp_buff.len = rc;
@@ -401,33 +473,33 @@ pageserver_receive(void)
{
char *msg = nm_to_string((NeonMessage *) resp);
neon_log(PageStoreTrace, "got response: %s", msg);
neon_shard_log(shard_no, PageStoreTrace, "got response: %s", msg);
pfree(msg);
}
}
else if (rc == -1)
{
neon_log(LOG, "pageserver_receive disconnect because call_PQgetCopyData returns -1: %s", pchomp(PQerrorMessage(pageserver_conn)));
pageserver_disconnect();
neon_shard_log(shard_no, LOG, "pageserver_receive disconnect because call_PQgetCopyData returns -1: %s", pchomp(PQerrorMessage(pageserver_conn)));
pageserver_disconnect(shard_no);
resp = NULL;
}
else if (rc == -2)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect();
neon_log(ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg);
pageserver_disconnect(shard_no);
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg);
}
else
{
pageserver_disconnect();
neon_log(ERROR, "pageserver_receive disconnect because unexpected PQgetCopyData return value: %d", rc);
pageserver_disconnect(shard_no);
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect because unexpected PQgetCopyData return value: %d", rc);
}
}
PG_CATCH();
{
neon_log(LOG, "pageserver_receive disconnect due to caught exception");
pageserver_disconnect();
neon_shard_log(shard_no, LOG, "pageserver_receive disconnect due to caught exception");
pageserver_disconnect(shard_no);
PG_RE_THROW();
}
PG_END_TRY();
@@ -437,11 +509,12 @@ pageserver_receive(void)
static bool
pageserver_flush(void)
pageserver_flush(shardno_t shard_no)
{
if (!connected)
PGconn* pageserver_conn = page_servers[shard_no].conn;
if (!pageserver_conn)
{
neon_log(WARNING, "Tried to flush while disconnected");
neon_shard_log(shard_no, WARNING, "Tried to flush while disconnected");
}
else
{
@@ -449,8 +522,8 @@ pageserver_flush(void)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect();
neon_log(LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg);
pageserver_disconnect(shard_no);
neon_shard_log(shard_no, LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg);
pfree(msg);
return false;
}
@@ -473,63 +546,61 @@ check_neon_id(char **newval, void **extra, GucSource source)
return **newval == '\0' || HexDecodeString(id, *newval, 16);
}
static Size
PagestoreShmemSize(void)
static void
AssignPageserverConnstring(const char *newval, void *extra)
{
return sizeof(PagestoreShmemState);
}
static bool
PagestoreShmemInit(void)
{
bool found;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
pagestore_shared = ShmemInitStruct("libpagestore shared state",
PagestoreShmemSize(),
&found);
if (!found)
/*
* Load shard map only at Postmaster.
* If old page server is not available, then backends can be blocked in attempts to reconnect to it and do not reload config in this loop
*/
if (shard_map != NULL && UsedShmemSegAddr != NULL && (MyProcPid == PostmasterPid || shard_map->n_shards == 0))
{
pagestore_shared->lock = &(GetNamedLWLockTranche("neon_libpagestore")->lock);
pg_atomic_init_u64(&pagestore_shared->update_counter, 0);
AssignPageserverConnstring(page_server_connstring, NULL);
char const* shard_connstr = newval;
char const* sep;
size_t connstr_len;
int i = 0;
bool shard_map_changed = false;
do
{
sep = strchr(shard_connstr, ',');
connstr_len = sep != NULL ? sep - shard_connstr : strlen(shard_connstr);
if (connstr_len == 0)
break; /* trailing comma */
if (i >= MAX_SHARDS)
{
neon_log(LOG, "Too many shards");
return;
}
if (connstr_len >= MAX_PS_CONNSTR_LEN)
{
neon_log(LOG, "Connection string too long");
return;
}
if (i >= shard_map->n_shards ||
strcmp(shard_map->shard_connstr[i], shard_connstr) != 0)
{
if (!shard_map_changed)
{
pg_atomic_add_fetch_u64(&shard_map->begin_update_counter, 1);
shard_map_changed = true;
}
memcpy(shard_map->shard_connstr[i], shard_connstr, connstr_len+1);
}
shard_connstr = sep + 1;
i += 1;
} while (sep != NULL);
if (i == 0)
{
neon_log(LOG, "No shards were specified");
return;
}
if (shard_map_changed)
{
shard_map->n_shards = i;
pg_atomic_add_fetch_u64(&shard_map->end_update_counter, 1);
}
}
LWLockRelease(AddinShmemInitLock);
return found;
}
static void
pagestore_shmem_startup_hook(void)
{
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();
PagestoreShmemInit();
}
static void
pagestore_shmem_request(void)
{
#if PG_VERSION_NUM >= 150000
if (prev_shmem_request_hook)
prev_shmem_request_hook();
#endif
RequestAddinShmemSpace(PagestoreShmemSize());
RequestNamedLWLockTranche("neon_libpagestore", 1);
}
static void
pagestore_prepare_shmem(void)
{
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = pagestore_shmem_request;
#else
pagestore_shmem_request();
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = pagestore_shmem_startup_hook;
}
/*
@@ -538,8 +609,6 @@ pagestore_prepare_shmem(void)
void
pg_init_libpagestore(void)
{
pagestore_prepare_shmem();
DefineCustomStringVariable("neon.pageserver_connstring",
"connection string to the page server",
NULL,
@@ -547,7 +616,7 @@ pg_init_libpagestore(void)
"",
PGC_SIGHUP,
0, /* no flags required */
CheckPageserverConnstring, AssignPageserverConnstring, NULL);
NULL, AssignPageserverConnstring, NULL);
DefineCustomStringVariable("neon.timeline_id",
"Neon timeline_id the server is running on",
@@ -567,6 +636,15 @@ pg_init_libpagestore(void)
0, /* no flags required */
check_neon_id, NULL, NULL);
DefineCustomIntVariable("neon.stripe_size",
"sharding sripe size",
NULL,
&stripe_size,
256, 1, INT_MAX,
PGC_SIGHUP,
GUC_UNIT_MB,
NULL, NULL, NULL);
DefineCustomIntVariable("neon.max_cluster_size",
"cluster size limit",
NULL,
@@ -632,4 +710,5 @@ pg_init_libpagestore(void)
}
lfc_init();
psm_init();
}

View File

@@ -16,16 +16,21 @@
#include "postgres.h"
#include "neon_pgversioncompat.h"
#include "access/slru.h"
#include "access/xlogdefs.h"
#include RELFILEINFO_HDR
#include "storage/block.h"
#include "storage/smgr.h"
#include "storage/buf_internals.h"
#include "lib/stringinfo.h"
#include "libpq/pqformat.h"
#include "utils/memutils.h"
#include "pg_config.h"
#define MAX_SHARDS 128
#define MAX_PS_CONNSTR_LEN 128
typedef enum
{
/* pagestore_client -> pagestore */
@@ -33,6 +38,7 @@ typedef enum
T_NeonNblocksRequest,
T_NeonGetPageRequest,
T_NeonDbSizeRequest,
T_NeonGetSlruSegmentRequest,
/* pagestore -> pagestore_client */
T_NeonExistsResponse = 100,
@@ -40,6 +46,7 @@ typedef enum
T_NeonGetPageResponse,
T_NeonErrorResponse,
T_NeonDbSizeResponse,
T_NeonGetSlruSegmentResponse,
} NeonMessageTag;
/* base struct for c-style inheritance */
@@ -54,6 +61,9 @@ typedef struct
#define neon_log(tag, fmt, ...) ereport(tag, \
(errmsg(NEON_TAG fmt, ##__VA_ARGS__), \
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
#define neon_shard_log(shard_no, tag, fmt, ...) ereport(tag, \
(errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
/*
* supertype of all the Neon*Request structs below
@@ -97,6 +107,13 @@ typedef struct
BlockNumber blkno;
} NeonGetPageRequest;
typedef struct
{
NeonRequest req;
SlruKind kind;
int segno;
} NeonGetSlruSegmentRequest;
/* supertype of all the Neon*Response structs below */
typedef struct
{
@@ -136,6 +153,14 @@ typedef struct
* message */
} NeonErrorResponse;
typedef struct
{
NeonMessageTag tag;
int n_blocks;
char data[BLCKSZ * SLRU_PAGES_PER_SEGMENT];
} NeonGetSlruSegmentResponse;
extern StringInfoData nm_pack_request(NeonRequest *msg);
extern NeonResponse *nm_unpack_response(StringInfo s);
extern char *nm_to_string(NeonMessage *msg);
@@ -144,11 +169,13 @@ extern char *nm_to_string(NeonMessage *msg);
* API
*/
typedef unsigned shardno_t;
typedef struct
{
bool (*send) (NeonRequest *request);
NeonResponse *(*receive) (void);
bool (*flush) (void);
bool (*send) (shardno_t shard_no, NeonRequest * request);
NeonResponse *(*receive) (shardno_t shard_no);
bool (*flush) (shardno_t shard_no);
} page_server_api;
extern void prefetch_on_ps_disconnect(void);
@@ -165,6 +192,8 @@ extern char *neon_tenant;
extern bool wal_redo;
extern int32 max_cluster_size;
extern shardno_t get_shard_number(BufferTag* tag);
extern const f_smgr *smgr_neon(BackendId backend, NRelFileInfo rinfo);
extern void smgr_init_neon(void);
extern void readahead_buffer_resize(int newsize, void *extra);

View File

@@ -168,6 +168,7 @@ typedef struct PrefetchRequest
XLogRecPtr actual_request_lsn;
NeonResponse *response; /* may be null */
PrefetchStatus status;
shardno_t shard_no;
uint64 my_ring_index;
} PrefetchRequest;
@@ -235,7 +236,9 @@ typedef struct PrefetchState
* also unused */
/* the buffers */
prfh_hash *prf_hash;
prfh_hash *prf_hash;
int max_shard_no;
uint8 shard_bitmap[(MAX_SHARDS + 7)/8];
PrefetchRequest prf_buffer[]; /* prefetch buffers */
} PrefetchState;
@@ -323,6 +326,7 @@ compact_prefetch_buffers(void)
Assert(target_slot->status == PRFS_UNUSED);
target_slot->buftag = source_slot->buftag;
target_slot->shard_no = source_slot->shard_no;
target_slot->status = source_slot->status;
target_slot->response = source_slot->response;
target_slot->effective_request_lsn = source_slot->effective_request_lsn;
@@ -490,6 +494,23 @@ prefetch_cleanup_trailing_unused(void)
}
}
static bool
prefetch_flush_requests(void)
{
for (shardno_t shard_no = 0; shard_no < MyPState->max_shard_no; shard_no++)
{
if (MyPState->shard_bitmap[shard_no >> 3] & (1 << (shard_no & 7)))
{
if (!page_server->flush(shard_no))
return false;
MyPState->shard_bitmap[shard_no >> 3] &= ~(1 << (shard_no & 7));
}
}
MyPState->max_shard_no = 0;
return true;
}
/*
* Wait for slot of ring_index to have received its response.
* The caller is responsible for making sure the request buffer is flushed.
@@ -505,7 +526,7 @@ prefetch_wait_for(uint64 ring_index)
if (MyPState->ring_flush <= ring_index &&
MyPState->ring_unused > MyPState->ring_flush)
{
if (!page_server->flush())
if (!prefetch_flush_requests())
return false;
MyPState->ring_flush = MyPState->ring_unused;
}
@@ -543,7 +564,7 @@ prefetch_read(PrefetchRequest *slot)
Assert(slot->my_ring_index == MyPState->ring_receive);
old = MemoryContextSwitchTo(MyPState->errctx);
response = (NeonResponse *) page_server->receive();
response = (NeonResponse *) page_server->receive(slot->shard_no);
MemoryContextSwitchTo(old);
if (response)
{
@@ -700,12 +721,14 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_unused);
while (!page_server->send((NeonRequest *) &request));
while (!page_server->send(slot->shard_no, (NeonRequest *) &request));
/* update prefetch state */
MyPState->n_requests_inflight += 1;
MyPState->n_unused -= 1;
MyPState->ring_unused += 1;
MyPState->shard_bitmap[slot->shard_no >> 3] |= 1 << (slot->shard_no & 7);
MyPState->max_shard_no = Max(slot->shard_no+1, MyPState->max_shard_no);
/* update slot state */
slot->status = PRFS_REQUESTED;
@@ -876,6 +899,7 @@ Retry:
* function reads the buffer tag from the slot.
*/
slot->buftag = tag;
slot->shard_no = get_shard_number(&tag);
slot->my_ring_index = ring_index;
prefetch_do_request(slot, force_latest, force_lsn);
@@ -886,7 +910,7 @@ Retry:
if (flush_every_n_requests > 0 &&
MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
{
if (!page_server->flush())
if (!prefetch_flush_requests())
{
/*
* Prefetch set is reset in case of error, so we should try to
@@ -904,13 +928,44 @@ static NeonResponse *
page_server_request(void const *req)
{
NeonResponse *resp;
BufferTag tag = {0};
shardno_t shard_no;
switch (((NeonRequest *) req)->tag)
{
case T_NeonExistsRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonExistsRequest *) req)->rinfo);
break;
case T_NeonNblocksRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonNblocksRequest *) req)->rinfo);
break;
case T_NeonDbSizeRequest:
NInfoGetDbOid(BufTagGetNRelFileInfo(tag)) = ((NeonDbSizeRequest *) req)->dbNode;
break;
case T_NeonGetPageRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonGetPageRequest *) req)->rinfo);
tag.blockNum = ((NeonGetPageRequest *) req)->blkno;
break;
default:
neon_log(ERROR, "Unexpected request tag: %d", ((NeonRequest *) req)->tag);
}
shard_no = get_shard_number(&tag);
/*
* TODO: temporary workarround - we stream all WAL only to shard 0, so metadata and forks other than main
* should be requested from shard 0. We still need to call get_shard_no() to check if shard map is up-to-date
*/
if (((NeonRequest *) req)->tag != T_NeonGetPageRequest || ((NeonGetPageRequest *) req)->forknum != MAIN_FORKNUM)
{
shard_no = 0;
}
do
{
while (!page_server->send((NeonRequest *) req) || !page_server->flush());
MyPState->ring_flush = MyPState->ring_unused;
while (!page_server->send(shard_no, (NeonRequest *) req) || !page_server->flush(shard_no));
consume_prefetch_responses();
resp = page_server->receive();
resp = page_server->receive(shard_no);
} while (resp == NULL);
return resp;
@@ -979,14 +1034,27 @@ nm_pack_request(NeonRequest *msg)
break;
}
case T_NeonGetSlruSegmentRequest:
{
NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendbyte(&s, msg_req->kind);
pq_sendint32(&s, msg_req->segno);
break;
}
/* pagestore -> pagestore_client. We never need to create these. */
case T_NeonExistsResponse:
case T_NeonNblocksResponse:
case T_NeonGetPageResponse:
case T_NeonErrorResponse:
case T_NeonDbSizeResponse:
case T_NeonGetSlruSegmentResponse:
default:
elog(ERROR, "unexpected neon message tag 0x%02x", msg->tag);
neon_log(ERROR, "unexpected neon message tag 0x%02x", msg->tag);
break;
}
return s;
@@ -1071,6 +1139,20 @@ nm_unpack_response(StringInfo s)
break;
}
case T_NeonGetSlruSegmentResponse:
{
NeonGetSlruSegmentResponse *msg_resp;
int n_blocks = pq_getmsgint(s, 4);
msg_resp = palloc(sizeof(NeonGetSlruSegmentResponse));
msg_resp->tag = tag;
msg_resp->n_blocks = n_blocks;
memcpy(msg_resp->data, pq_getmsgbytes(s, n_blocks * BLCKSZ), n_blocks * BLCKSZ);
pq_getmsgend(s);
resp = (NeonResponse *) msg_resp;
break;
}
/*
* pagestore_client -> pagestore
*
@@ -1080,8 +1162,9 @@ nm_unpack_response(StringInfo s)
case T_NeonNblocksRequest:
case T_NeonGetPageRequest:
case T_NeonDbSizeRequest:
case T_NeonGetSlruSegmentRequest:
default:
elog(ERROR, "unexpected neon message tag 0x%02x", tag);
neon_log(ERROR, "unexpected neon message tag 0x%02x", tag);
break;
}
@@ -1149,7 +1232,18 @@ nm_to_string(NeonMessage *msg)
appendStringInfoChar(&s, '}');
break;
}
case T_NeonGetSlruSegmentRequest:
{
NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg;
appendStringInfoString(&s, "{\"type\": \"NeonGetSlruSegmentRequest\"");
appendStringInfo(&s, ", \"kind\": %u", msg_req->kind);
appendStringInfo(&s, ", \"segno\": %u", msg_req->segno);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfoChar(&s, '}');
break;
}
/* pagestore -> pagestore_client */
case T_NeonExistsResponse:
{
@@ -1203,6 +1297,17 @@ nm_to_string(NeonMessage *msg)
msg_resp->db_size);
appendStringInfoChar(&s, '}');
break;
}
case T_NeonGetSlruSegmentResponse:
{
NeonGetSlruSegmentResponse *msg_resp = (NeonGetSlruSegmentResponse *) msg;
appendStringInfoString(&s, "{\"type\": \"NeonGetSlruSegmentResponse\"");
appendStringInfo(&s, ", \"n_blocks\": %u}",
msg_resp->n_blocks);
appendStringInfoChar(&s, '}');
break;
}
@@ -1273,7 +1378,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co
XLogFlush(recptr);
lsn = recptr;
ereport(SmgrTrace,
(errmsg("Page %u of relation %u/%u/%u.%u was force logged. Evicted at lsn=%X/%X",
(errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u was force logged. Evicted at lsn=%X/%X",
blocknum,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum, LSN_FORMAT_ARGS(lsn))));
@@ -1301,7 +1406,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co
if (PageIsNew((Page) buffer))
{
ereport(SmgrTrace,
(errmsg("Page %u of relation %u/%u/%u.%u is all-zeros",
(errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u is all-zeros",
blocknum,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum)));
@@ -1309,7 +1414,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co
else if (PageIsEmptyHeapPage((Page) buffer))
{
ereport(SmgrTrace,
(errmsg("Page %u of relation %u/%u/%u.%u is an empty heap page with no LSN",
(errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u is an empty heap page with no LSN",
blocknum,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum)));
@@ -1317,7 +1422,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co
else
{
ereport(PANIC,
(errmsg("Page %u of relation %u/%u/%u.%u is evicted with zero LSN",
(errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u is evicted with zero LSN",
blocknum,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum)));
@@ -1326,7 +1431,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co
else
{
ereport(SmgrTrace,
(errmsg("Page %u of relation %u/%u/%u.%u is already wal logged at lsn=%X/%X",
(errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u is already wal logged at lsn=%X/%X",
blocknum,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum, LSN_FORMAT_ARGS(lsn))));
@@ -1423,7 +1528,7 @@ neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, Block
lsn = GetLastWrittenLSN(rinfo, forknum, blkno);
lsn = nm_adjust_lsn(lsn);
elog(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ",
neon_log(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ",
(uint32) ((lsn) >> 32), (uint32) (lsn));
}
else
@@ -1438,7 +1543,7 @@ neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, Block
*latest = true;
lsn = GetLastWrittenLSN(rinfo, forknum, blkno);
Assert(lsn != InvalidXLogRecPtr);
elog(DEBUG1, "neon_get_request_lsn GetLastWrittenLSN lsn %X/%X ",
neon_log(DEBUG1, "neon_get_request_lsn GetLastWrittenLSN lsn %X/%X ",
(uint32) ((lsn) >> 32), (uint32) (lsn));
lsn = nm_adjust_lsn(lsn);
@@ -1458,7 +1563,7 @@ neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, Block
#endif
if (lsn > flushlsn)
{
elog(DEBUG5, "last-written LSN %X/%X is ahead of last flushed LSN %X/%X",
neon_log(DEBUG5, "last-written LSN %X/%X is ahead of last flushed LSN %X/%X",
(uint32) (lsn >> 32), (uint32) lsn,
(uint32) (flushlsn >> 32), (uint32) flushlsn);
XLogFlush(lsn);
@@ -1502,7 +1607,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
return mdexists(reln, forkNum);
default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
if (get_cached_relsize(InfoFromSMgrRel(reln), forkNum, &n_blocks))
@@ -1554,7 +1659,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
case T_NeonErrorResponse:
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X",
errmsg(NEON_TAG "could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X",
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forkNum,
(uint32) (request_lsn >> 32), (uint32) request_lsn),
@@ -1563,7 +1668,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
break;
default:
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
}
pfree(resp);
return exists;
@@ -1580,7 +1685,7 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
switch (reln->smgr_relpersistence)
{
case 0:
elog(ERROR, "cannot call smgrcreate() on rel with unknown persistence");
neon_log(ERROR, "cannot call smgrcreate() on rel with unknown persistence");
case RELPERSISTENCE_PERMANENT:
break;
@@ -1591,10 +1696,10 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
return;
default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
elog(SmgrTrace, "Create relation %u/%u/%u.%u",
neon_log(SmgrTrace, "Create relation %u/%u/%u.%u",
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forkNum);
@@ -1689,7 +1794,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
switch (reln->smgr_relpersistence)
{
case 0:
elog(ERROR, "cannot call smgrextend() on rel with unknown persistence");
neon_log(ERROR, "cannot call smgrextend() on rel with unknown persistence");
case RELPERSISTENCE_PERMANENT:
break;
@@ -1700,7 +1805,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
return;
default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
/*
@@ -1719,7 +1824,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
if (current_size >= ((uint64) max_cluster_size) * 1024 * 1024)
ereport(ERROR,
(errcode(ERRCODE_DISK_FULL),
errmsg("could not extend file because project size limit (%d MB) has been exceeded",
errmsg(NEON_TAG "could not extend file because project size limit (%d MB) has been exceeded",
max_cluster_size),
errhint("This limit is defined externally by the project size limit, and internally by neon.max_cluster_size GUC")));
}
@@ -1738,7 +1843,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
set_cached_relsize(InfoFromSMgrRel(reln), forkNum, blkno + 1);
lsn = PageGetLSN((Page) buffer);
elog(SmgrTrace, "smgrextend called for %u/%u/%u.%u blk %u, page LSN: %X/%08X",
neon_log(SmgrTrace, "smgrextend called for %u/%u/%u.%u blk %u, page LSN: %X/%08X",
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forkNum, blkno,
(uint32) (lsn >> 32), (uint32) lsn);
@@ -1778,7 +1883,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
switch (reln->smgr_relpersistence)
{
case 0:
elog(ERROR, "cannot call smgrextend() on rel with unknown persistence");
neon_log(ERROR, "cannot call smgrextend() on rel with unknown persistence");
case RELPERSISTENCE_PERMANENT:
break;
@@ -1789,7 +1894,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
return;
default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
if (max_cluster_size > 0 &&
@@ -1801,7 +1906,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
if (current_size >= ((uint64) max_cluster_size) * 1024 * 1024)
ereport(ERROR,
(errcode(ERRCODE_DISK_FULL),
errmsg("could not extend file because cluster size limit (%d MB) has been exceeded",
errmsg(NEON_TAG "could not extend file because cluster size limit (%d MB) has been exceeded",
max_cluster_size),
errhint("This limit is defined by neon.max_cluster_size GUC")));
}
@@ -1814,7 +1919,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
if ((uint64) blocknum + nblocks >= (uint64) InvalidBlockNumber)
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("cannot extend file \"%s\" beyond %u blocks",
errmsg(NEON_TAG "cannot extend file \"%s\" beyond %u blocks",
relpath(reln->smgr_rlocator, forkNum),
InvalidBlockNumber)));
@@ -1875,7 +1980,7 @@ neon_open(SMgrRelation reln)
mdopen(reln);
/* no work */
elog(SmgrTrace, "[NEON_SMGR] open noop");
neon_log(SmgrTrace, "open noop");
}
/*
@@ -1912,7 +2017,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
return mdprefetch(reln, forknum, blocknum);
default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
if (lfc_cache_contains(InfoFromSMgrRel(reln), forknum, blocknum))
@@ -1957,11 +2062,11 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
return;
default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
/* not implemented */
elog(SmgrTrace, "[NEON_SMGR] writeback noop");
neon_log(SmgrTrace, "writeback noop");
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
@@ -2091,8 +2196,8 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
case T_NeonErrorResponse:
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X",
blkno,
errmsg(NEON_TAG "[shard %d] could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X",
slot->shard_no, blkno,
RelFileInfoFmt(rinfo),
forkNum,
(uint32) (request_lsn >> 32), (uint32) request_lsn),
@@ -2100,7 +2205,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
((NeonErrorResponse *) resp)->message)));
break;
default:
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
}
/* buffer was used, clean up for later reuse */
@@ -2124,7 +2229,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
switch (reln->smgr_relpersistence)
{
case 0:
elog(ERROR, "cannot call smgrread() on rel with unknown persistence");
neon_log(ERROR, "cannot call smgrread() on rel with unknown persistence");
case RELPERSISTENCE_PERMANENT:
break;
@@ -2135,7 +2240,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
return;
default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
/* Try to read from local file cache */
@@ -2163,7 +2268,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
{
if (!PageIsNew((Page) pageserver_masked))
{
elog(PANIC, "page is new in MD but not in Page Server at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n",
neon_log(PANIC, "page is new in MD but not in Page Server at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n",
blkno,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forkNum,
@@ -2173,7 +2278,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
}
else if (PageIsNew((Page) buffer))
{
elog(PANIC, "page is new in Page Server but not in MD at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n",
neon_log(PANIC, "page is new in Page Server but not in MD at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n",
blkno,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forkNum,
@@ -2188,7 +2293,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
if (memcmp(mdbuf_masked, pageserver_masked, BLCKSZ) != 0)
{
elog(PANIC, "heap buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n",
neon_log(PANIC, "heap buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n",
blkno,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forkNum,
@@ -2207,7 +2312,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
if (memcmp(mdbuf_masked, pageserver_masked, BLCKSZ) != 0)
{
elog(PANIC, "btree buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n",
neon_log(PANIC, "btree buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n",
blkno,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forkNum,
@@ -2287,13 +2392,13 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
return;
default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
neon_wallog_page(reln, forknum, blocknum, buffer, false);
lsn = PageGetLSN((Page) buffer);
elog(SmgrTrace, "smgrwrite called for %u/%u/%u.%u blk %u, page LSN: %X/%08X",
neon_log(SmgrTrace, "smgrwrite called for %u/%u/%u.%u blk %u, page LSN: %X/%08X",
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum, blocknum,
(uint32) (lsn >> 32), (uint32) lsn);
@@ -2320,7 +2425,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
switch (reln->smgr_relpersistence)
{
case 0:
elog(ERROR, "cannot call smgrnblocks() on rel with unknown persistence");
neon_log(ERROR, "cannot call smgrnblocks() on rel with unknown persistence");
break;
case RELPERSISTENCE_PERMANENT:
@@ -2331,12 +2436,12 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
return mdnblocks(reln, forknum);
default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
if (get_cached_relsize(InfoFromSMgrRel(reln), forknum, &n_blocks))
{
elog(SmgrTrace, "cached nblocks for %u/%u/%u.%u: %u blocks",
neon_log(SmgrTrace, "cached nblocks for %u/%u/%u.%u: %u blocks",
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum, n_blocks);
return n_blocks;
@@ -2364,7 +2469,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
case T_NeonErrorResponse:
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X",
errmsg(NEON_TAG "could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X",
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum,
(uint32) (request_lsn >> 32), (uint32) request_lsn),
@@ -2373,11 +2478,11 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
break;
default:
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
}
update_cached_relsize(InfoFromSMgrRel(reln), forknum, n_blocks);
elog(SmgrTrace, "neon_nblocks: rel %u/%u/%u fork %u (request LSN %X/%08X): %u blocks",
neon_log(SmgrTrace, "neon_nblocks: rel %u/%u/%u fork %u (request LSN %X/%08X): %u blocks",
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
@@ -2420,7 +2525,7 @@ neon_dbsize(Oid dbNode)
case T_NeonErrorResponse:
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("could not read db size of db %u from page server at lsn %X/%08X",
errmsg(NEON_TAG "could not read db size of db %u from page server at lsn %X/%08X",
dbNode,
(uint32) (request_lsn >> 32), (uint32) request_lsn),
errdetail("page server returned error: %s",
@@ -2428,10 +2533,10 @@ neon_dbsize(Oid dbNode)
break;
default:
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
}
elog(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes",
neon_log(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes",
dbNode,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
db_size);
@@ -2451,7 +2556,7 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
switch (reln->smgr_relpersistence)
{
case 0:
elog(ERROR, "cannot call smgrtruncate() on rel with unknown persistence");
neon_log(ERROR, "cannot call smgrtruncate() on rel with unknown persistence");
break;
case RELPERSISTENCE_PERMANENT:
@@ -2463,7 +2568,7 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
return;
default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
set_cached_relsize(InfoFromSMgrRel(reln), forknum, nblocks);
@@ -2519,7 +2624,7 @@ neon_immedsync(SMgrRelation reln, ForkNumber forknum)
switch (reln->smgr_relpersistence)
{
case 0:
elog(ERROR, "cannot call smgrimmedsync() on rel with unknown persistence");
neon_log(ERROR, "cannot call smgrimmedsync() on rel with unknown persistence");
break;
case RELPERSISTENCE_PERMANENT:
@@ -2531,10 +2636,10 @@ neon_immedsync(SMgrRelation reln, ForkNumber forknum)
return;
default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
elog(SmgrTrace, "[NEON_SMGR] immedsync noop");
neon_log(SmgrTrace, "[NEON_SMGR] immedsync noop");
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
@@ -2559,17 +2664,17 @@ neon_start_unlogged_build(SMgrRelation reln)
* progress at a time. That's enough for the current usage.
*/
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_IN_PROGRESS)
elog(ERROR, "unlogged relation build is already in progress");
neon_log(ERROR, "unlogged relation build is already in progress");
Assert(unlogged_build_rel == NULL);
ereport(SmgrTrace,
(errmsg("starting unlogged build of relation %u/%u/%u",
(errmsg(NEON_TAG "starting unlogged build of relation %u/%u/%u",
RelFileInfoFmt(InfoFromSMgrRel(reln)))));
switch (reln->smgr_relpersistence)
{
case 0:
elog(ERROR, "cannot call smgr_start_unlogged_build() on rel with unknown persistence");
neon_log(ERROR, "cannot call smgr_start_unlogged_build() on rel with unknown persistence");
break;
case RELPERSISTENCE_PERMANENT:
@@ -2582,11 +2687,11 @@ neon_start_unlogged_build(SMgrRelation reln)
return;
default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
if (smgrnblocks(reln, MAIN_FORKNUM) != 0)
elog(ERROR, "cannot perform unlogged index build, index is not empty ");
neon_log(ERROR, "cannot perform unlogged index build, index is not empty ");
unlogged_build_rel = reln;
unlogged_build_phase = UNLOGGED_BUILD_PHASE_1;
@@ -2613,7 +2718,7 @@ neon_finish_unlogged_build_phase_1(SMgrRelation reln)
Assert(unlogged_build_rel == reln);
ereport(SmgrTrace,
(errmsg("finishing phase 1 of unlogged build of relation %u/%u/%u",
(errmsg(NEON_TAG "finishing phase 1 of unlogged build of relation %u/%u/%u",
RelFileInfoFmt(InfoFromSMgrRel(reln)))));
if (unlogged_build_phase == UNLOGGED_BUILD_NOT_PERMANENT)
@@ -2642,7 +2747,7 @@ neon_end_unlogged_build(SMgrRelation reln)
Assert(unlogged_build_rel == reln);
ereport(SmgrTrace,
(errmsg("ending unlogged build of relation %u/%u/%u",
(errmsg(NEON_TAG "ending unlogged build of relation %u/%u/%u",
RelFileInfoFmt(InfoFromNInfoB(rinfob)))));
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_PERMANENT)
@@ -2657,7 +2762,7 @@ neon_end_unlogged_build(SMgrRelation reln)
rinfob = InfoBFromSMgrRel(reln);
for (int forknum = 0; forknum <= MAX_FORKNUM; forknum++)
{
elog(SmgrTrace, "forgetting cached relsize for %u/%u/%u.%u",
neon_log(SmgrTrace, "forgetting cached relsize for %u/%u/%u.%u",
RelFileInfoFmt(InfoFromNInfoB(rinfob)),
forknum);
@@ -2672,6 +2777,61 @@ neon_end_unlogged_build(SMgrRelation reln)
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
}
static int
neon_read_slru_segment(SMgrRelation reln, SlruKind kind, int segno, void* buffer)
{
XLogRecPtr request_lsn;
/* TODO: any better alternative than flush LSN? Actually we to request SLRU at basebackup creation time... */
#if PG_VERSION_NUM >= 150000
request_lsn = GetFlushRecPtr(NULL);
#else
request_lsn = GetFlushRecPtr();
#endif
NeonResponse *resp;
shardno_t shard_no = 0; /* SLRU are at the zero shard */
NeonGetSlruSegmentRequest request = {
.req.tag = T_NeonGetSlruSegmentRequest,
.req.latest = false,
.req.lsn = request_lsn,
.kind = kind,
.segno = segno
};
int n_blocks;
do
{
while (!page_server->send(shard_no, &request.req) || !page_server->flush(shard_no));
consume_prefetch_responses();
resp = page_server->receive(shard_no);
} while (resp == NULL);
switch (resp->tag)
{
case T_NeonGetSlruSegmentResponse:
n_blocks = ((NeonGetSlruSegmentResponse *) resp)->n_blocks;
memcpy(buffer, ((NeonGetSlruSegmentResponse *) resp)->data, n_blocks*BLCKSZ);
break;
case T_NeonErrorResponse:
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "could not read SLRU %d segment %d at lsn %X/%08X",
kind,
segno,
(uint32) (request_lsn >> 32), (uint32) request_lsn),
errdetail("page server returned error: %s",
((NeonErrorResponse *) resp)->message)));
break;
default:
neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
}
pfree(resp);
return n_blocks;
}
static void
AtEOXact_neon(XactEvent event, void *arg)
{
@@ -2700,7 +2860,7 @@ AtEOXact_neon(XactEvent event, void *arg)
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
(errmsg("unlogged index build was not properly finished"))));
(errmsg(NEON_TAG "unlogged index build was not properly finished"))));
}
break;
}
@@ -2730,6 +2890,8 @@ static const struct f_smgr neon_smgr =
.smgr_start_unlogged_build = neon_start_unlogged_build,
.smgr_finish_unlogged_build_phase_1 = neon_finish_unlogged_build_phase_1,
.smgr_end_unlogged_build = neon_end_unlogged_build,
.smgr_read_slru_segment = neon_read_slru_segment,
};
const f_smgr *
@@ -2799,7 +2961,7 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
set_cached_relsize(rinfo, forknum, relsize);
SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);
elog(SmgrTrace, "Set length to %d", relsize);
neon_log(SmgrTrace, "Set length to %d", relsize);
}
}
@@ -2887,7 +3049,7 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
#if PG_VERSION_NUM < 150000
if (!XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno))
elog(PANIC, "failed to locate backup block with ID %d", block_id);
neon_log(PANIC, "failed to locate backup block with ID %d", block_id);
#else
XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno);
#endif

View File

@@ -60,6 +60,7 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
match msg {
PagestreamFeMessage::Exists(_) => {}
PagestreamFeMessage::Nblocks(_) => {}
PagestreamFeMessage::GetSlruSegment(_) => {}
PagestreamFeMessage::GetPage(req) => {
total += 1;

View File

@@ -1,5 +1,5 @@
{
"postgres-v16": "e3a22b72922055f9212eca12700190f118578362",
"postgres-v15": "bc88f539312fcc4bb292ce94ae9db09ab6656e8a",
"postgres-v14": "dd067cf656f6810a25aca6025633d32d02c5085a"
"postgres-v16": "ce3b15942c91adec8e83a43d2cb713038f2fcf53",
"postgres-v15": "7a9d31fd826d251b7f62f1f83808bcd00c5ef554",
"postgres-v14": "3b28a698276dd17aadd883adc8e0a9ff0f87be0f"
}