mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-04 11:10:37 +00:00
Compare commits
20 Commits
add_audit_
...
load_slru_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a170e49e91 | ||
|
|
1cae50eacc | ||
|
|
ac17c2f69c | ||
|
|
322dd3cf00 | ||
|
|
8e6e4b1112 | ||
|
|
818290b7b6 | ||
|
|
7effdf448a | ||
|
|
ffc2145cfb | ||
|
|
7bad8e80fb | ||
|
|
d3f7cf428e | ||
|
|
0556364f21 | ||
|
|
3b2e84fddb | ||
|
|
3705049a24 | ||
|
|
fd65d0eb80 | ||
|
|
df938c889d | ||
|
|
8c1896be4e | ||
|
|
52cc1f8ede | ||
|
|
845846695c | ||
|
|
4274807eda | ||
|
|
e10515f3af |
@@ -5,6 +5,7 @@ use std::{
|
||||
};
|
||||
|
||||
use byteorder::{BigEndian, ReadBytesExt};
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::serde_as;
|
||||
use strum_macros;
|
||||
@@ -570,6 +571,7 @@ pub enum PagestreamFeMessage {
|
||||
Nblocks(PagestreamNblocksRequest),
|
||||
GetPage(PagestreamGetPageRequest),
|
||||
DbSize(PagestreamDbSizeRequest),
|
||||
GetSlruSegment(PagestreamGetSlruSegmentRequest),
|
||||
}
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
@@ -579,6 +581,7 @@ pub enum PagestreamBeMessage {
|
||||
GetPage(PagestreamGetPageResponse),
|
||||
Error(PagestreamErrorResponse),
|
||||
DbSize(PagestreamDbSizeResponse),
|
||||
GetSlruSegment(PagestreamGetSlruSegmentResponse),
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
@@ -610,6 +613,14 @@ pub struct PagestreamDbSizeRequest {
|
||||
pub dbnode: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct PagestreamGetSlruSegmentRequest {
|
||||
pub latest: bool,
|
||||
pub lsn: Lsn,
|
||||
pub kind: u8,
|
||||
pub segno: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamExistsResponse {
|
||||
pub exists: bool,
|
||||
@@ -625,6 +636,11 @@ pub struct PagestreamGetPageResponse {
|
||||
pub page: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamGetSlruSegmentResponse {
|
||||
pub segment: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamErrorResponse {
|
||||
pub message: String,
|
||||
@@ -677,6 +693,14 @@ impl PagestreamFeMessage {
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.dbnode);
|
||||
}
|
||||
|
||||
Self::GetSlruSegment(req) => {
|
||||
bytes.put_u8(4);
|
||||
bytes.put_u8(u8::from(req.latest));
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u8(req.kind);
|
||||
bytes.put_u32(req.segno);
|
||||
}
|
||||
}
|
||||
|
||||
bytes.into()
|
||||
@@ -727,6 +751,14 @@ impl PagestreamFeMessage {
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
})),
|
||||
4 => Ok(PagestreamFeMessage::GetSlruSegment(
|
||||
PagestreamGetSlruSegmentRequest {
|
||||
latest: body.read_u8()? != 0,
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
kind: body.read_u8()?,
|
||||
segno: body.read_u32::<BigEndian>()?,
|
||||
},
|
||||
)),
|
||||
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
|
||||
}
|
||||
}
|
||||
@@ -761,6 +793,12 @@ impl PagestreamBeMessage {
|
||||
bytes.put_u8(104); /* tag from pagestore_client.h */
|
||||
bytes.put_i64(resp.db_size);
|
||||
}
|
||||
|
||||
Self::GetSlruSegment(resp) => {
|
||||
bytes.put_u8(105); /* tag from pagestore_client.h */
|
||||
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
|
||||
bytes.put(&resp.segment[..]);
|
||||
}
|
||||
}
|
||||
|
||||
bytes.into()
|
||||
|
||||
@@ -108,9 +108,22 @@ impl RelTag {
|
||||
/// These files are divided into segments, which are divided into
|
||||
/// pages of the same BLCKSZ as used for relation files.
|
||||
///
|
||||
#[derive(Debug, Clone, Copy, Hash, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
|
||||
#[derive(
|
||||
Debug,
|
||||
Clone,
|
||||
Copy,
|
||||
strum_macros::FromRepr,
|
||||
Hash,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
PartialEq,
|
||||
Eq,
|
||||
PartialOrd,
|
||||
Ord,
|
||||
)]
|
||||
#[repr(u8)]
|
||||
pub enum SlruKind {
|
||||
Clog,
|
||||
Clog = 0,
|
||||
MultiXactMembers,
|
||||
MultiXactOffsets,
|
||||
}
|
||||
|
||||
@@ -139,6 +139,8 @@ where
|
||||
async fn send_tarball(mut self) -> anyhow::Result<()> {
|
||||
// TODO include checksum
|
||||
|
||||
let on_demand_slru_download = true; // TODO: should it be feature flag, config parameter or whatever else ?
|
||||
|
||||
// Create pgdata subdirs structure
|
||||
for dir in PGDATA_SUBDIRS.iter() {
|
||||
let header = new_tar_header_dir(dir)?;
|
||||
@@ -165,19 +167,20 @@ where
|
||||
.context("could not add config file to basebackup tarball")?;
|
||||
}
|
||||
}
|
||||
|
||||
// Gather non-relational files from object storage pages.
|
||||
for kind in [
|
||||
SlruKind::Clog,
|
||||
SlruKind::MultiXactOffsets,
|
||||
SlruKind::MultiXactMembers,
|
||||
] {
|
||||
for segno in self
|
||||
.timeline
|
||||
.list_slru_segments(kind, self.lsn, self.ctx)
|
||||
.await?
|
||||
{
|
||||
self.add_slru_segment(kind, segno).await?;
|
||||
if !on_demand_slru_download {
|
||||
// Gather non-relational files from object storage pages.
|
||||
for kind in [
|
||||
SlruKind::Clog,
|
||||
SlruKind::MultiXactOffsets,
|
||||
SlruKind::MultiXactMembers,
|
||||
] {
|
||||
for segno in self
|
||||
.timeline
|
||||
.list_slru_segments(kind, self.lsn, self.ctx)
|
||||
.await?
|
||||
{
|
||||
self.add_slru_segment(kind, segno).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -961,6 +961,7 @@ pub enum SmgrQueryType {
|
||||
GetRelSize,
|
||||
GetPageAtLsn,
|
||||
GetDbSize,
|
||||
GetSlruSegment,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -1030,6 +1031,7 @@ mod smgr_query_time_tests {
|
||||
(GetRelSize, "get_rel_size"),
|
||||
(GetPageAtLsn, "get_page_at_lsn"),
|
||||
(GetDbSize, "get_db_size"),
|
||||
(GetSlruSegment, "get_slru_segment"),
|
||||
];
|
||||
for (op, expect) in expect {
|
||||
let actual: &'static str = op.into();
|
||||
|
||||
@@ -19,7 +19,8 @@ use pageserver_api::models::{
|
||||
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
|
||||
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
|
||||
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
|
||||
PagestreamNblocksRequest, PagestreamNblocksResponse,
|
||||
PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
|
||||
PagestreamNblocksResponse,
|
||||
};
|
||||
use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, QueryError};
|
||||
use pq_proto::framed::ConnectionError;
|
||||
@@ -64,6 +65,7 @@ use crate::tenant::mgr::ShardSelector;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::trace::Tracer;
|
||||
|
||||
use pageserver_api::reltag::SlruKind;
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
|
||||
@@ -518,6 +520,16 @@ impl PageServerHandler {
|
||||
span,
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::GetSlruSegment(req) => {
|
||||
let _timer = metrics.start_timer(metrics::SmgrQueryType::GetSlruSegment);
|
||||
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.lsn);
|
||||
(
|
||||
self.handle_get_slru_segment_request(&timeline, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
span,
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = &response {
|
||||
@@ -862,6 +874,25 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
async fn handle_get_slru_segment_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamGetSlruSegmentRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<PagestreamBeMessage> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
.await?;
|
||||
|
||||
let kind = SlruKind::from_repr(req.kind).ok_or(anyhow::anyhow!("invalid SLRU kind"))?;
|
||||
let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
|
||||
|
||||
Ok(PagestreamBeMessage::GetSlruSegment(
|
||||
PagestreamGetSlruSegmentResponse { segment },
|
||||
))
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(skip_all, fields(?lsn, ?prev_lsn, %full_backup))]
|
||||
async fn handle_basebackup_request<IO>(
|
||||
|
||||
@@ -12,7 +12,7 @@ use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::repository::*;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use anyhow::Context;
|
||||
use bytes::{Buf, Bytes};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use pageserver_api::key::is_rel_block_key;
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
@@ -314,6 +314,25 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the whole SLRU segment
|
||||
pub async fn get_slru_segment(
|
||||
&self,
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
let n_blocks = self.get_slru_segment_size(kind, segno, lsn, ctx).await?;
|
||||
let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
|
||||
for blkno in 0..n_blocks {
|
||||
let block = self
|
||||
.get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
|
||||
.await?;
|
||||
segment.extend_from_slice(&block[..BLCKSZ as usize]);
|
||||
}
|
||||
Ok(segment.freeze())
|
||||
}
|
||||
|
||||
/// Look up given SLRU page version.
|
||||
pub async fn get_slru_page_at_lsn(
|
||||
&self,
|
||||
|
||||
@@ -312,13 +312,13 @@ lfc_change_limit_hook(int newval, void *extra)
|
||||
Assert(victim->access_count == 0);
|
||||
#ifdef FALLOC_FL_PUNCH_HOLE
|
||||
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, (off_t) victim->offset * BLOCKS_PER_CHUNK * BLCKSZ, BLOCKS_PER_CHUNK * BLCKSZ) < 0)
|
||||
elog(LOG, "Failed to punch hole in file: %m");
|
||||
neon_log(LOG, "Failed to punch hole in file: %m");
|
||||
#endif
|
||||
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
|
||||
lfc_ctl->used -= 1;
|
||||
}
|
||||
lfc_ctl->limit = new_size;
|
||||
elog(DEBUG1, "set local file cache limit to %d", new_size);
|
||||
neon_log(DEBUG1, "set local file cache limit to %d", new_size);
|
||||
|
||||
LWLockRelease(lfc_lock);
|
||||
}
|
||||
@@ -331,7 +331,7 @@ lfc_init(void)
|
||||
* shared_preload_libraries.
|
||||
*/
|
||||
if (!process_shared_preload_libraries_in_progress)
|
||||
elog(ERROR, "Neon module should be loaded via shared_preload_libraries");
|
||||
neon_log(ERROR, "Neon module should be loaded via shared_preload_libraries");
|
||||
|
||||
|
||||
DefineCustomIntVariable("neon.max_file_cache_size",
|
||||
@@ -647,7 +647,7 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, const void
|
||||
Assert(victim->access_count == 0);
|
||||
entry->offset = victim->offset; /* grab victim's chunk */
|
||||
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
|
||||
elog(DEBUG2, "Swap file cache page");
|
||||
neon_log(DEBUG2, "Swap file cache page");
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -850,10 +850,10 @@ local_cache_pages(PG_FUNCTION_ARGS)
|
||||
* wrong) function definition though.
|
||||
*/
|
||||
if (get_call_result_type(fcinfo, NULL, &expected_tupledesc) != TYPEFUNC_COMPOSITE)
|
||||
elog(ERROR, "return type must be a row type");
|
||||
neon_log(ERROR, "return type must be a row type");
|
||||
|
||||
if (expected_tupledesc->natts != NUM_LOCALCACHE_PAGES_ELEM)
|
||||
elog(ERROR, "incorrect number of output arguments");
|
||||
neon_log(ERROR, "incorrect number of output arguments");
|
||||
|
||||
/* Construct a tuple descriptor for the result rows. */
|
||||
tupledesc = CreateTemplateTupleDesc(expected_tupledesc->natts);
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
#include "fmgr.h"
|
||||
#include "access/xlog.h"
|
||||
#include "access/xlogutils.h"
|
||||
#include "common/hashfn.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "storage/ipc.h"
|
||||
@@ -36,22 +37,12 @@
|
||||
#include "neon.h"
|
||||
#include "walproposer.h"
|
||||
#include "neon_utils.h"
|
||||
#include "control_plane_connector.h"
|
||||
|
||||
#define PageStoreTrace DEBUG5
|
||||
|
||||
#define RECONNECT_INTERVAL_USEC 1000000
|
||||
|
||||
bool connected = false;
|
||||
PGconn *pageserver_conn = NULL;
|
||||
|
||||
/*
|
||||
* WaitEventSet containing:
|
||||
* - WL_SOCKET_READABLE on pageserver_conn,
|
||||
* - WL_LATCH_SET on MyLatch, and
|
||||
* - WL_EXIT_ON_PM_DEATH.
|
||||
*/
|
||||
WaitEventSet *pageserver_conn_wes = NULL;
|
||||
|
||||
/* GUCs */
|
||||
char *neon_timeline;
|
||||
char *neon_tenant;
|
||||
@@ -64,87 +55,176 @@ int flush_every_n_requests = 8;
|
||||
|
||||
int n_reconnect_attempts = 0;
|
||||
int max_reconnect_attempts = 60;
|
||||
int stripe_size;
|
||||
|
||||
#define MAX_PAGESERVER_CONNSTRING_SIZE 256
|
||||
bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
|
||||
|
||||
static bool pageserver_flush(shardno_t shard_no);
|
||||
static void pageserver_disconnect(shardno_t shard_no);
|
||||
static void AssignPageserverConnstring(const char *newval, void *extra);
|
||||
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook;
|
||||
#if PG_VERSION_NUM>=150000
|
||||
static shmem_request_hook_type prev_shmem_request_hook;
|
||||
#endif
|
||||
|
||||
typedef struct
|
||||
{
|
||||
LWLockId lock;
|
||||
pg_atomic_uint64 update_counter;
|
||||
char pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
|
||||
} PagestoreShmemState;
|
||||
size_t n_shards;
|
||||
pg_atomic_uint64 begin_update_counter;
|
||||
pg_atomic_uint64 end_update_counter;
|
||||
char shard_connstr[MAX_SHARDS][MAX_PS_CONNSTR_LEN];
|
||||
} ShardMap;
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
static shmem_request_hook_type prev_shmem_request_hook = NULL;
|
||||
static void walproposer_shmem_request(void);
|
||||
|
||||
static ShardMap* shard_map;
|
||||
static uint64 shard_map_update_counter;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
/*
|
||||
* Connection for each shard
|
||||
*/
|
||||
PGconn *conn;
|
||||
/*
|
||||
* WaitEventSet containing:
|
||||
* - WL_SOCKET_READABLE on pageserver_conn,
|
||||
* - WL_LATCH_SET on MyLatch, and
|
||||
* - WL_EXIT_ON_PM_DEATH.
|
||||
*/
|
||||
WaitEventSet *wes;
|
||||
} PageServer;
|
||||
|
||||
static PageServer page_servers[MAX_SHARDS];
|
||||
static shardno_t max_attached_shard_no;
|
||||
|
||||
static void
|
||||
psm_shmem_startup(void)
|
||||
{
|
||||
bool found;
|
||||
if (prev_shmem_startup_hook)
|
||||
{
|
||||
prev_shmem_startup_hook();
|
||||
}
|
||||
|
||||
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
|
||||
|
||||
shard_map = (ShardMap*)ShmemInitStruct("shard_map", sizeof(ShardMap), &found);
|
||||
if (!found)
|
||||
{
|
||||
shard_map->n_shards = 0;
|
||||
pg_atomic_init_u64(&shard_map->begin_update_counter, 0);
|
||||
pg_atomic_init_u64(&shard_map->end_update_counter, 0);
|
||||
AssignPageserverConnstring(page_server_connstring, NULL);
|
||||
}
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
}
|
||||
|
||||
static void
|
||||
psm_shmem_request(void)
|
||||
{
|
||||
#if PG_VERSION_NUM>=150000
|
||||
if (prev_shmem_request_hook)
|
||||
prev_shmem_request_hook();
|
||||
#endif
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook;
|
||||
static PagestoreShmemState *pagestore_shared;
|
||||
static uint64 pagestore_local_counter = 0;
|
||||
static char local_pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
|
||||
|
||||
bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
|
||||
|
||||
static bool pageserver_flush(void);
|
||||
static void pageserver_disconnect(void);
|
||||
|
||||
static bool
|
||||
PagestoreShmemIsValid()
|
||||
{
|
||||
return pagestore_shared && UsedShmemSegAddr;
|
||||
}
|
||||
|
||||
static bool
|
||||
CheckPageserverConnstring(char **newval, void **extra, GucSource source)
|
||||
{
|
||||
return strlen(*newval) < MAX_PAGESERVER_CONNSTRING_SIZE;
|
||||
RequestAddinShmemSpace(sizeof(ShardMap));
|
||||
}
|
||||
|
||||
static void
|
||||
AssignPageserverConnstring(const char *newval, void *extra)
|
||||
psm_init(void)
|
||||
{
|
||||
if (!PagestoreShmemIsValid())
|
||||
return;
|
||||
LWLockAcquire(pagestore_shared->lock, LW_EXCLUSIVE);
|
||||
strlcpy(pagestore_shared->pageserver_connstring, newval, MAX_PAGESERVER_CONNSTRING_SIZE);
|
||||
pg_atomic_fetch_add_u64(&pagestore_shared->update_counter, 1);
|
||||
LWLockRelease(pagestore_shared->lock);
|
||||
prev_shmem_startup_hook = shmem_startup_hook;
|
||||
shmem_startup_hook = psm_shmem_startup;
|
||||
#if PG_VERSION_NUM>=150000
|
||||
prev_shmem_request_hook = shmem_request_hook;
|
||||
shmem_request_hook = psm_shmem_request;
|
||||
#else
|
||||
psm_shmem_request();
|
||||
#endif
|
||||
}
|
||||
|
||||
/*
|
||||
* Reload page map if needed and return number of shards and connection string for the specified shard
|
||||
*/
|
||||
static shardno_t
|
||||
load_shard_map(shardno_t shard_no, char* connstr)
|
||||
{
|
||||
shardno_t n_shards;
|
||||
uint64 begin_update_counter;
|
||||
uint64 end_update_counter;
|
||||
|
||||
/*
|
||||
* There is race condition here between backendc and postmaster which can update shard map.
|
||||
* We recheck update couner after copying connection string to check that configuration was not changed.
|
||||
*/
|
||||
do
|
||||
{
|
||||
begin_update_counter = pg_atomic_read_u64(&shard_map->begin_update_counter);
|
||||
end_update_counter = pg_atomic_read_u64(&shard_map->end_update_counter);
|
||||
|
||||
n_shards = shard_map->n_shards;
|
||||
if (shard_no >= n_shards)
|
||||
neon_log(ERROR, "Shard %d is greater or equal than number of shards %d", shard_no, n_shards);
|
||||
|
||||
if (connstr)
|
||||
strncpy(connstr, shard_map->shard_connstr[shard_no], MAX_PS_CONNSTR_LEN);
|
||||
|
||||
}
|
||||
while (begin_update_counter != end_update_counter
|
||||
|| begin_update_counter != pg_atomic_read_u64(&shard_map->begin_update_counter)
|
||||
|| end_update_counter != pg_atomic_read_u64(&shard_map->end_update_counter));
|
||||
|
||||
|
||||
if (shard_map_update_counter != end_update_counter)
|
||||
{
|
||||
/* Reset all connections if connection strings are changed */
|
||||
for (shardno_t i = 0; i < max_attached_shard_no; i++)
|
||||
{
|
||||
if (page_servers[i].conn)
|
||||
pageserver_disconnect(i);
|
||||
}
|
||||
max_attached_shard_no = 0;
|
||||
shard_map_update_counter = end_update_counter;
|
||||
}
|
||||
|
||||
return n_shards;
|
||||
}
|
||||
|
||||
#define MB (1024*1024)
|
||||
|
||||
shardno_t
|
||||
get_shard_number(BufferTag* tag)
|
||||
{
|
||||
shardno_t n_shards = load_shard_map(0, NULL);
|
||||
uint32 hash;
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
hash = murmurhash32(tag->rnode.relNode);
|
||||
hash = hash_combine(hash, murmurhash32(tag->blockNum/(MB/BLCKSZ)/stripe_size));
|
||||
#else
|
||||
hash = murmurhash32(tag->relNumber);
|
||||
hash = hash_combine(hash, murmurhash32(tag->blockNum/(MB/BLCKSZ)/stripe_size));
|
||||
#endif
|
||||
|
||||
return hash % n_shards;
|
||||
}
|
||||
|
||||
static bool
|
||||
CheckConnstringUpdated()
|
||||
{
|
||||
if (!PagestoreShmemIsValid())
|
||||
return false;
|
||||
return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->update_counter);
|
||||
}
|
||||
|
||||
static void
|
||||
ReloadConnstring()
|
||||
{
|
||||
if (!PagestoreShmemIsValid())
|
||||
return;
|
||||
LWLockAcquire(pagestore_shared->lock, LW_SHARED);
|
||||
strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring));
|
||||
pagestore_local_counter = pg_atomic_read_u64(&pagestore_shared->update_counter);
|
||||
LWLockRelease(pagestore_shared->lock);
|
||||
}
|
||||
|
||||
static bool
|
||||
pageserver_connect(int elevel)
|
||||
pageserver_connect(shardno_t shard_no, int elevel)
|
||||
{
|
||||
char *query;
|
||||
int ret;
|
||||
const char *keywords[3];
|
||||
const char *values[3];
|
||||
int n;
|
||||
PGconn* conn;
|
||||
WaitEventSet *wes;
|
||||
char connstr[MAX_PS_CONNSTR_LEN];
|
||||
|
||||
Assert(!connected);
|
||||
Assert(page_servers[shard_no].conn == NULL);
|
||||
|
||||
if (CheckConnstringUpdated())
|
||||
{
|
||||
ReloadConnstring();
|
||||
}
|
||||
(void)load_shard_map(shard_no, connstr); /* refresh page map if needed */
|
||||
|
||||
/*
|
||||
* Connect using the connection string we got from the
|
||||
@@ -164,50 +244,47 @@ pageserver_connect(int elevel)
|
||||
n++;
|
||||
}
|
||||
keywords[n] = "dbname";
|
||||
values[n] = local_pageserver_connstring;
|
||||
values[n] = connstr;
|
||||
n++;
|
||||
keywords[n] = NULL;
|
||||
values[n] = NULL;
|
||||
n++;
|
||||
pageserver_conn = PQconnectdbParams(keywords, values, 1);
|
||||
conn = PQconnectdbParams(keywords, values, 1);
|
||||
|
||||
if (PQstatus(pageserver_conn) == CONNECTION_BAD)
|
||||
if (PQstatus(conn) == CONNECTION_BAD)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
char *msg = pchomp(PQerrorMessage(conn));
|
||||
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
PQfinish(conn);
|
||||
|
||||
ereport(elevel,
|
||||
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
|
||||
errmsg(NEON_TAG "could not establish connection to pageserver"),
|
||||
errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", shard_no),
|
||||
errdetail_internal("%s", msg)));
|
||||
return false;
|
||||
}
|
||||
|
||||
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
|
||||
ret = PQsendQuery(pageserver_conn, query);
|
||||
ret = PQsendQuery(conn, query);
|
||||
if (ret != 1)
|
||||
{
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
neon_log(elevel, "could not send pagestream command to pageserver");
|
||||
PQfinish(conn);
|
||||
neon_shard_log(shard_no, elevel, "could not send pagestream command to pageserver");
|
||||
return false;
|
||||
}
|
||||
|
||||
pageserver_conn_wes = CreateWaitEventSet(TopMemoryContext, 3);
|
||||
AddWaitEventToSet(pageserver_conn_wes, WL_LATCH_SET, PGINVALID_SOCKET,
|
||||
wes = CreateWaitEventSet(TopMemoryContext, 3);
|
||||
AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET,
|
||||
MyLatch, NULL);
|
||||
AddWaitEventToSet(pageserver_conn_wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
|
||||
AddWaitEventToSet(wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
|
||||
NULL, NULL);
|
||||
AddWaitEventToSet(pageserver_conn_wes, WL_SOCKET_READABLE, PQsocket(pageserver_conn), NULL, NULL);
|
||||
AddWaitEventToSet(wes, WL_SOCKET_READABLE, PQsocket(conn), NULL, NULL);
|
||||
|
||||
while (PQisBusy(pageserver_conn))
|
||||
while (PQisBusy(conn))
|
||||
{
|
||||
WaitEvent event;
|
||||
|
||||
/* Sleep until there's something to do */
|
||||
(void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
|
||||
(void) WaitEventSetWait(wes, -1L, &event, 1, PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
@@ -215,25 +292,25 @@ pageserver_connect(int elevel)
|
||||
/* Data available in socket? */
|
||||
if (event.events & WL_SOCKET_READABLE)
|
||||
{
|
||||
if (!PQconsumeInput(pageserver_conn))
|
||||
if (!PQconsumeInput(conn))
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
char *msg = pchomp(PQerrorMessage(conn));
|
||||
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
FreeWaitEventSet(pageserver_conn_wes);
|
||||
pageserver_conn_wes = NULL;
|
||||
PQfinish(conn);
|
||||
FreeWaitEventSet(wes);
|
||||
|
||||
neon_log(elevel, "could not complete handshake with pageserver: %s",
|
||||
msg);
|
||||
neon_shard_log(shard_no, elevel, "could not complete handshake with pageserver: %s",
|
||||
msg);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
neon_log(LOG, "libpagestore: connected to '%s'", page_server_connstring);
|
||||
neon_shard_log(shard_no, LOG, "libpagestore: connected to '%s'", connstr);
|
||||
page_servers[shard_no].conn = conn;
|
||||
page_servers[shard_no].wes = wes;
|
||||
max_attached_shard_no = Max(shard_no+1, max_attached_shard_no);
|
||||
|
||||
connected = true;
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -241,10 +318,10 @@ pageserver_connect(int elevel)
|
||||
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
|
||||
*/
|
||||
static int
|
||||
call_PQgetCopyData(char **buffer)
|
||||
call_PQgetCopyData(shardno_t shard_no, char **buffer)
|
||||
{
|
||||
int ret;
|
||||
|
||||
PGconn* pageserver_conn = page_servers[shard_no].conn;
|
||||
retry:
|
||||
ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ );
|
||||
|
||||
@@ -253,7 +330,7 @@ retry:
|
||||
WaitEvent event;
|
||||
|
||||
/* Sleep until there's something to do */
|
||||
(void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
|
||||
(void) WaitEventSetWait(page_servers[shard_no].wes, -1L, &event, 1, PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
@@ -265,7 +342,7 @@ retry:
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
|
||||
neon_log(LOG, "could not get response from pageserver: %s", msg);
|
||||
neon_shard_log(shard_no, LOG, "could not get response from pageserver: %s", msg);
|
||||
pfree(msg);
|
||||
return -1;
|
||||
}
|
||||
@@ -279,7 +356,7 @@ retry:
|
||||
|
||||
|
||||
static void
|
||||
pageserver_disconnect(void)
|
||||
pageserver_disconnect(shardno_t shard_no)
|
||||
{
|
||||
/*
|
||||
* If anything goes wrong while we were sending a request, it's not clear
|
||||
@@ -288,38 +365,32 @@ pageserver_disconnect(void)
|
||||
* time later after we have already sent a new unrelated request. Close
|
||||
* the connection to avoid getting confused.
|
||||
*/
|
||||
if (connected)
|
||||
if (page_servers[shard_no].conn)
|
||||
{
|
||||
neon_log(LOG, "dropping connection to page server due to error");
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
connected = false;
|
||||
neon_shard_log(shard_no, LOG, "dropping connection to page server due to error");
|
||||
PQfinish(page_servers[shard_no].conn);
|
||||
page_servers[shard_no].conn = NULL;
|
||||
|
||||
prefetch_on_ps_disconnect();
|
||||
}
|
||||
if (pageserver_conn_wes != NULL)
|
||||
if (page_servers[shard_no].wes != NULL)
|
||||
{
|
||||
FreeWaitEventSet(pageserver_conn_wes);
|
||||
pageserver_conn_wes = NULL;
|
||||
FreeWaitEventSet(page_servers[shard_no].wes);
|
||||
page_servers[shard_no].wes = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static bool
|
||||
pageserver_send(NeonRequest *request)
|
||||
pageserver_send(shardno_t shard_no, NeonRequest *request)
|
||||
{
|
||||
StringInfoData req_buff;
|
||||
|
||||
if (CheckConnstringUpdated())
|
||||
{
|
||||
pageserver_disconnect();
|
||||
ReloadConnstring();
|
||||
}
|
||||
PGconn* pageserver_conn = page_servers[shard_no].conn;
|
||||
|
||||
/* If the connection was lost for some reason, reconnect */
|
||||
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
|
||||
if (pageserver_conn && PQstatus(pageserver_conn) == CONNECTION_BAD)
|
||||
{
|
||||
neon_log(LOG, "pageserver_send disconnect bad connection");
|
||||
pageserver_disconnect();
|
||||
neon_shard_log(shard_no, LOG, "pageserver_send disconnect bad connection");
|
||||
pageserver_disconnect(shard_no);
|
||||
}
|
||||
|
||||
req_buff = nm_pack_request(request);
|
||||
@@ -333,9 +404,9 @@ pageserver_send(NeonRequest *request)
|
||||
* https://github.com/neondatabase/neon/issues/1138 So try to reestablish
|
||||
* connection in case of failure.
|
||||
*/
|
||||
if (!connected)
|
||||
if (!page_servers[shard_no].conn)
|
||||
{
|
||||
while (!pageserver_connect(n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
|
||||
while (!pageserver_connect(shard_no, n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
|
||||
{
|
||||
HandleMainLoopInterrupts();
|
||||
n_reconnect_attempts += 1;
|
||||
@@ -344,7 +415,9 @@ pageserver_send(NeonRequest *request)
|
||||
n_reconnect_attempts = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
pageserver_conn = page_servers[shard_no].conn;
|
||||
|
||||
/*
|
||||
* Send request.
|
||||
*
|
||||
* In principle, this could block if the output buffer is full, and we
|
||||
@@ -355,9 +428,8 @@ pageserver_send(NeonRequest *request)
|
||||
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
|
||||
pageserver_disconnect();
|
||||
neon_log(LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg);
|
||||
pageserver_disconnect(shard_no);
|
||||
neon_shard_log(shard_no, LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg);
|
||||
pfree(msg);
|
||||
pfree(req_buff.data);
|
||||
return false;
|
||||
@@ -369,19 +441,19 @@ pageserver_send(NeonRequest *request)
|
||||
{
|
||||
char *msg = nm_to_string((NeonMessage *) request);
|
||||
|
||||
neon_log(PageStoreTrace, "sent request: %s", msg);
|
||||
neon_shard_log(shard_no, PageStoreTrace, "sent request: %s", msg);
|
||||
pfree(msg);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
static NeonResponse *
|
||||
pageserver_receive(void)
|
||||
pageserver_receive(shardno_t shard_no)
|
||||
{
|
||||
StringInfoData resp_buff;
|
||||
NeonResponse *resp;
|
||||
|
||||
if (!connected)
|
||||
PGconn* pageserver_conn = page_servers[shard_no].conn;
|
||||
if (!pageserver_conn)
|
||||
return NULL;
|
||||
|
||||
PG_TRY();
|
||||
@@ -389,7 +461,7 @@ pageserver_receive(void)
|
||||
/* read response */
|
||||
int rc;
|
||||
|
||||
rc = call_PQgetCopyData(&resp_buff.data);
|
||||
rc = call_PQgetCopyData(shard_no, &resp_buff.data);
|
||||
if (rc >= 0)
|
||||
{
|
||||
resp_buff.len = rc;
|
||||
@@ -401,33 +473,33 @@ pageserver_receive(void)
|
||||
{
|
||||
char *msg = nm_to_string((NeonMessage *) resp);
|
||||
|
||||
neon_log(PageStoreTrace, "got response: %s", msg);
|
||||
neon_shard_log(shard_no, PageStoreTrace, "got response: %s", msg);
|
||||
pfree(msg);
|
||||
}
|
||||
}
|
||||
else if (rc == -1)
|
||||
{
|
||||
neon_log(LOG, "pageserver_receive disconnect because call_PQgetCopyData returns -1: %s", pchomp(PQerrorMessage(pageserver_conn)));
|
||||
pageserver_disconnect();
|
||||
neon_shard_log(shard_no, LOG, "pageserver_receive disconnect because call_PQgetCopyData returns -1: %s", pchomp(PQerrorMessage(pageserver_conn)));
|
||||
pageserver_disconnect(shard_no);
|
||||
resp = NULL;
|
||||
}
|
||||
else if (rc == -2)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
|
||||
pageserver_disconnect();
|
||||
neon_log(ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg);
|
||||
pageserver_disconnect(shard_no);
|
||||
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg);
|
||||
}
|
||||
else
|
||||
{
|
||||
pageserver_disconnect();
|
||||
neon_log(ERROR, "pageserver_receive disconnect because unexpected PQgetCopyData return value: %d", rc);
|
||||
pageserver_disconnect(shard_no);
|
||||
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect because unexpected PQgetCopyData return value: %d", rc);
|
||||
}
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
neon_log(LOG, "pageserver_receive disconnect due to caught exception");
|
||||
pageserver_disconnect();
|
||||
neon_shard_log(shard_no, LOG, "pageserver_receive disconnect due to caught exception");
|
||||
pageserver_disconnect(shard_no);
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
@@ -437,11 +509,12 @@ pageserver_receive(void)
|
||||
|
||||
|
||||
static bool
|
||||
pageserver_flush(void)
|
||||
pageserver_flush(shardno_t shard_no)
|
||||
{
|
||||
if (!connected)
|
||||
PGconn* pageserver_conn = page_servers[shard_no].conn;
|
||||
if (!pageserver_conn)
|
||||
{
|
||||
neon_log(WARNING, "Tried to flush while disconnected");
|
||||
neon_shard_log(shard_no, WARNING, "Tried to flush while disconnected");
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -449,8 +522,8 @@ pageserver_flush(void)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
|
||||
pageserver_disconnect();
|
||||
neon_log(LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg);
|
||||
pageserver_disconnect(shard_no);
|
||||
neon_shard_log(shard_no, LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg);
|
||||
pfree(msg);
|
||||
return false;
|
||||
}
|
||||
@@ -473,63 +546,61 @@ check_neon_id(char **newval, void **extra, GucSource source)
|
||||
return **newval == '\0' || HexDecodeString(id, *newval, 16);
|
||||
}
|
||||
|
||||
static Size
|
||||
PagestoreShmemSize(void)
|
||||
static void
|
||||
AssignPageserverConnstring(const char *newval, void *extra)
|
||||
{
|
||||
return sizeof(PagestoreShmemState);
|
||||
}
|
||||
|
||||
static bool
|
||||
PagestoreShmemInit(void)
|
||||
{
|
||||
bool found;
|
||||
|
||||
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
|
||||
pagestore_shared = ShmemInitStruct("libpagestore shared state",
|
||||
PagestoreShmemSize(),
|
||||
&found);
|
||||
if (!found)
|
||||
/*
|
||||
* Load shard map only at Postmaster.
|
||||
* If old page server is not available, then backends can be blocked in attempts to reconnect to it and do not reload config in this loop
|
||||
*/
|
||||
if (shard_map != NULL && UsedShmemSegAddr != NULL && (MyProcPid == PostmasterPid || shard_map->n_shards == 0))
|
||||
{
|
||||
pagestore_shared->lock = &(GetNamedLWLockTranche("neon_libpagestore")->lock);
|
||||
pg_atomic_init_u64(&pagestore_shared->update_counter, 0);
|
||||
AssignPageserverConnstring(page_server_connstring, NULL);
|
||||
char const* shard_connstr = newval;
|
||||
char const* sep;
|
||||
size_t connstr_len;
|
||||
int i = 0;
|
||||
bool shard_map_changed = false;
|
||||
do
|
||||
{
|
||||
sep = strchr(shard_connstr, ',');
|
||||
connstr_len = sep != NULL ? sep - shard_connstr : strlen(shard_connstr);
|
||||
if (connstr_len == 0)
|
||||
break; /* trailing comma */
|
||||
if (i >= MAX_SHARDS)
|
||||
{
|
||||
neon_log(LOG, "Too many shards");
|
||||
return;
|
||||
}
|
||||
if (connstr_len >= MAX_PS_CONNSTR_LEN)
|
||||
{
|
||||
neon_log(LOG, "Connection string too long");
|
||||
return;
|
||||
}
|
||||
if (i >= shard_map->n_shards ||
|
||||
strcmp(shard_map->shard_connstr[i], shard_connstr) != 0)
|
||||
{
|
||||
if (!shard_map_changed)
|
||||
{
|
||||
pg_atomic_add_fetch_u64(&shard_map->begin_update_counter, 1);
|
||||
shard_map_changed = true;
|
||||
}
|
||||
memcpy(shard_map->shard_connstr[i], shard_connstr, connstr_len+1);
|
||||
}
|
||||
shard_connstr = sep + 1;
|
||||
i += 1;
|
||||
} while (sep != NULL);
|
||||
|
||||
if (i == 0)
|
||||
{
|
||||
neon_log(LOG, "No shards were specified");
|
||||
return;
|
||||
}
|
||||
if (shard_map_changed)
|
||||
{
|
||||
shard_map->n_shards = i;
|
||||
pg_atomic_add_fetch_u64(&shard_map->end_update_counter, 1);
|
||||
}
|
||||
}
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
return found;
|
||||
}
|
||||
|
||||
static void
|
||||
pagestore_shmem_startup_hook(void)
|
||||
{
|
||||
if (prev_shmem_startup_hook)
|
||||
prev_shmem_startup_hook();
|
||||
|
||||
PagestoreShmemInit();
|
||||
}
|
||||
|
||||
static void
|
||||
pagestore_shmem_request(void)
|
||||
{
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
if (prev_shmem_request_hook)
|
||||
prev_shmem_request_hook();
|
||||
#endif
|
||||
|
||||
RequestAddinShmemSpace(PagestoreShmemSize());
|
||||
RequestNamedLWLockTranche("neon_libpagestore", 1);
|
||||
}
|
||||
|
||||
static void
|
||||
pagestore_prepare_shmem(void)
|
||||
{
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
prev_shmem_request_hook = shmem_request_hook;
|
||||
shmem_request_hook = pagestore_shmem_request;
|
||||
#else
|
||||
pagestore_shmem_request();
|
||||
#endif
|
||||
prev_shmem_startup_hook = shmem_startup_hook;
|
||||
shmem_startup_hook = pagestore_shmem_startup_hook;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -538,8 +609,6 @@ pagestore_prepare_shmem(void)
|
||||
void
|
||||
pg_init_libpagestore(void)
|
||||
{
|
||||
pagestore_prepare_shmem();
|
||||
|
||||
DefineCustomStringVariable("neon.pageserver_connstring",
|
||||
"connection string to the page server",
|
||||
NULL,
|
||||
@@ -547,7 +616,7 @@ pg_init_libpagestore(void)
|
||||
"",
|
||||
PGC_SIGHUP,
|
||||
0, /* no flags required */
|
||||
CheckPageserverConnstring, AssignPageserverConnstring, NULL);
|
||||
NULL, AssignPageserverConnstring, NULL);
|
||||
|
||||
DefineCustomStringVariable("neon.timeline_id",
|
||||
"Neon timeline_id the server is running on",
|
||||
@@ -567,6 +636,15 @@ pg_init_libpagestore(void)
|
||||
0, /* no flags required */
|
||||
check_neon_id, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable("neon.stripe_size",
|
||||
"sharding sripe size",
|
||||
NULL,
|
||||
&stripe_size,
|
||||
256, 1, INT_MAX,
|
||||
PGC_SIGHUP,
|
||||
GUC_UNIT_MB,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable("neon.max_cluster_size",
|
||||
"cluster size limit",
|
||||
NULL,
|
||||
@@ -632,4 +710,5 @@ pg_init_libpagestore(void)
|
||||
}
|
||||
|
||||
lfc_init();
|
||||
psm_init();
|
||||
}
|
||||
|
||||
@@ -16,16 +16,21 @@
|
||||
#include "postgres.h"
|
||||
#include "neon_pgversioncompat.h"
|
||||
|
||||
#include "access/slru.h"
|
||||
#include "access/xlogdefs.h"
|
||||
#include RELFILEINFO_HDR
|
||||
#include "storage/block.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "utils/memutils.h"
|
||||
|
||||
#include "pg_config.h"
|
||||
|
||||
#define MAX_SHARDS 128
|
||||
#define MAX_PS_CONNSTR_LEN 128
|
||||
|
||||
typedef enum
|
||||
{
|
||||
/* pagestore_client -> pagestore */
|
||||
@@ -33,6 +38,7 @@ typedef enum
|
||||
T_NeonNblocksRequest,
|
||||
T_NeonGetPageRequest,
|
||||
T_NeonDbSizeRequest,
|
||||
T_NeonGetSlruSegmentRequest,
|
||||
|
||||
/* pagestore -> pagestore_client */
|
||||
T_NeonExistsResponse = 100,
|
||||
@@ -40,6 +46,7 @@ typedef enum
|
||||
T_NeonGetPageResponse,
|
||||
T_NeonErrorResponse,
|
||||
T_NeonDbSizeResponse,
|
||||
T_NeonGetSlruSegmentResponse,
|
||||
} NeonMessageTag;
|
||||
|
||||
/* base struct for c-style inheritance */
|
||||
@@ -54,6 +61,9 @@ typedef struct
|
||||
#define neon_log(tag, fmt, ...) ereport(tag, \
|
||||
(errmsg(NEON_TAG fmt, ##__VA_ARGS__), \
|
||||
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
|
||||
#define neon_shard_log(shard_no, tag, fmt, ...) ereport(tag, \
|
||||
(errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \
|
||||
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
|
||||
|
||||
/*
|
||||
* supertype of all the Neon*Request structs below
|
||||
@@ -97,6 +107,13 @@ typedef struct
|
||||
BlockNumber blkno;
|
||||
} NeonGetPageRequest;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
NeonRequest req;
|
||||
SlruKind kind;
|
||||
int segno;
|
||||
} NeonGetSlruSegmentRequest;
|
||||
|
||||
/* supertype of all the Neon*Response structs below */
|
||||
typedef struct
|
||||
{
|
||||
@@ -136,6 +153,14 @@ typedef struct
|
||||
* message */
|
||||
} NeonErrorResponse;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
NeonMessageTag tag;
|
||||
int n_blocks;
|
||||
char data[BLCKSZ * SLRU_PAGES_PER_SEGMENT];
|
||||
} NeonGetSlruSegmentResponse;
|
||||
|
||||
|
||||
extern StringInfoData nm_pack_request(NeonRequest *msg);
|
||||
extern NeonResponse *nm_unpack_response(StringInfo s);
|
||||
extern char *nm_to_string(NeonMessage *msg);
|
||||
@@ -144,11 +169,13 @@ extern char *nm_to_string(NeonMessage *msg);
|
||||
* API
|
||||
*/
|
||||
|
||||
typedef unsigned shardno_t;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
bool (*send) (NeonRequest *request);
|
||||
NeonResponse *(*receive) (void);
|
||||
bool (*flush) (void);
|
||||
bool (*send) (shardno_t shard_no, NeonRequest * request);
|
||||
NeonResponse *(*receive) (shardno_t shard_no);
|
||||
bool (*flush) (shardno_t shard_no);
|
||||
} page_server_api;
|
||||
|
||||
extern void prefetch_on_ps_disconnect(void);
|
||||
@@ -165,6 +192,8 @@ extern char *neon_tenant;
|
||||
extern bool wal_redo;
|
||||
extern int32 max_cluster_size;
|
||||
|
||||
extern shardno_t get_shard_number(BufferTag* tag);
|
||||
|
||||
extern const f_smgr *smgr_neon(BackendId backend, NRelFileInfo rinfo);
|
||||
extern void smgr_init_neon(void);
|
||||
extern void readahead_buffer_resize(int newsize, void *extra);
|
||||
|
||||
@@ -168,6 +168,7 @@ typedef struct PrefetchRequest
|
||||
XLogRecPtr actual_request_lsn;
|
||||
NeonResponse *response; /* may be null */
|
||||
PrefetchStatus status;
|
||||
shardno_t shard_no;
|
||||
uint64 my_ring_index;
|
||||
} PrefetchRequest;
|
||||
|
||||
@@ -235,7 +236,9 @@ typedef struct PrefetchState
|
||||
* also unused */
|
||||
|
||||
/* the buffers */
|
||||
prfh_hash *prf_hash;
|
||||
prfh_hash *prf_hash;
|
||||
int max_shard_no;
|
||||
uint8 shard_bitmap[(MAX_SHARDS + 7)/8];
|
||||
PrefetchRequest prf_buffer[]; /* prefetch buffers */
|
||||
} PrefetchState;
|
||||
|
||||
@@ -323,6 +326,7 @@ compact_prefetch_buffers(void)
|
||||
Assert(target_slot->status == PRFS_UNUSED);
|
||||
|
||||
target_slot->buftag = source_slot->buftag;
|
||||
target_slot->shard_no = source_slot->shard_no;
|
||||
target_slot->status = source_slot->status;
|
||||
target_slot->response = source_slot->response;
|
||||
target_slot->effective_request_lsn = source_slot->effective_request_lsn;
|
||||
@@ -490,6 +494,23 @@ prefetch_cleanup_trailing_unused(void)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
prefetch_flush_requests(void)
|
||||
{
|
||||
for (shardno_t shard_no = 0; shard_no < MyPState->max_shard_no; shard_no++)
|
||||
{
|
||||
if (MyPState->shard_bitmap[shard_no >> 3] & (1 << (shard_no & 7)))
|
||||
{
|
||||
if (!page_server->flush(shard_no))
|
||||
return false;
|
||||
MyPState->shard_bitmap[shard_no >> 3] &= ~(1 << (shard_no & 7));
|
||||
}
|
||||
}
|
||||
MyPState->max_shard_no = 0;
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Wait for slot of ring_index to have received its response.
|
||||
* The caller is responsible for making sure the request buffer is flushed.
|
||||
@@ -505,7 +526,7 @@ prefetch_wait_for(uint64 ring_index)
|
||||
if (MyPState->ring_flush <= ring_index &&
|
||||
MyPState->ring_unused > MyPState->ring_flush)
|
||||
{
|
||||
if (!page_server->flush())
|
||||
if (!prefetch_flush_requests())
|
||||
return false;
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
}
|
||||
@@ -543,7 +564,7 @@ prefetch_read(PrefetchRequest *slot)
|
||||
Assert(slot->my_ring_index == MyPState->ring_receive);
|
||||
|
||||
old = MemoryContextSwitchTo(MyPState->errctx);
|
||||
response = (NeonResponse *) page_server->receive();
|
||||
response = (NeonResponse *) page_server->receive(slot->shard_no);
|
||||
MemoryContextSwitchTo(old);
|
||||
if (response)
|
||||
{
|
||||
@@ -700,12 +721,14 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
|
||||
Assert(slot->response == NULL);
|
||||
Assert(slot->my_ring_index == MyPState->ring_unused);
|
||||
|
||||
while (!page_server->send((NeonRequest *) &request));
|
||||
while (!page_server->send(slot->shard_no, (NeonRequest *) &request));
|
||||
|
||||
/* update prefetch state */
|
||||
MyPState->n_requests_inflight += 1;
|
||||
MyPState->n_unused -= 1;
|
||||
MyPState->ring_unused += 1;
|
||||
MyPState->shard_bitmap[slot->shard_no >> 3] |= 1 << (slot->shard_no & 7);
|
||||
MyPState->max_shard_no = Max(slot->shard_no+1, MyPState->max_shard_no);
|
||||
|
||||
/* update slot state */
|
||||
slot->status = PRFS_REQUESTED;
|
||||
@@ -876,6 +899,7 @@ Retry:
|
||||
* function reads the buffer tag from the slot.
|
||||
*/
|
||||
slot->buftag = tag;
|
||||
slot->shard_no = get_shard_number(&tag);
|
||||
slot->my_ring_index = ring_index;
|
||||
|
||||
prefetch_do_request(slot, force_latest, force_lsn);
|
||||
@@ -886,7 +910,7 @@ Retry:
|
||||
if (flush_every_n_requests > 0 &&
|
||||
MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
|
||||
{
|
||||
if (!page_server->flush())
|
||||
if (!prefetch_flush_requests())
|
||||
{
|
||||
/*
|
||||
* Prefetch set is reset in case of error, so we should try to
|
||||
@@ -904,13 +928,44 @@ static NeonResponse *
|
||||
page_server_request(void const *req)
|
||||
{
|
||||
NeonResponse *resp;
|
||||
BufferTag tag = {0};
|
||||
shardno_t shard_no;
|
||||
|
||||
switch (((NeonRequest *) req)->tag)
|
||||
{
|
||||
case T_NeonExistsRequest:
|
||||
CopyNRelFileInfoToBufTag(tag, ((NeonExistsRequest *) req)->rinfo);
|
||||
break;
|
||||
case T_NeonNblocksRequest:
|
||||
CopyNRelFileInfoToBufTag(tag, ((NeonNblocksRequest *) req)->rinfo);
|
||||
break;
|
||||
case T_NeonDbSizeRequest:
|
||||
NInfoGetDbOid(BufTagGetNRelFileInfo(tag)) = ((NeonDbSizeRequest *) req)->dbNode;
|
||||
break;
|
||||
case T_NeonGetPageRequest:
|
||||
CopyNRelFileInfoToBufTag(tag, ((NeonGetPageRequest *) req)->rinfo);
|
||||
tag.blockNum = ((NeonGetPageRequest *) req)->blkno;
|
||||
break;
|
||||
default:
|
||||
neon_log(ERROR, "Unexpected request tag: %d", ((NeonRequest *) req)->tag);
|
||||
}
|
||||
shard_no = get_shard_number(&tag);
|
||||
|
||||
|
||||
/*
|
||||
* TODO: temporary workarround - we stream all WAL only to shard 0, so metadata and forks other than main
|
||||
* should be requested from shard 0. We still need to call get_shard_no() to check if shard map is up-to-date
|
||||
*/
|
||||
if (((NeonRequest *) req)->tag != T_NeonGetPageRequest || ((NeonGetPageRequest *) req)->forknum != MAIN_FORKNUM)
|
||||
{
|
||||
shard_no = 0;
|
||||
}
|
||||
|
||||
do
|
||||
{
|
||||
while (!page_server->send((NeonRequest *) req) || !page_server->flush());
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
while (!page_server->send(shard_no, (NeonRequest *) req) || !page_server->flush(shard_no));
|
||||
consume_prefetch_responses();
|
||||
resp = page_server->receive();
|
||||
resp = page_server->receive(shard_no);
|
||||
} while (resp == NULL);
|
||||
return resp;
|
||||
|
||||
@@ -979,14 +1034,27 @@ nm_pack_request(NeonRequest *msg)
|
||||
break;
|
||||
}
|
||||
|
||||
case T_NeonGetSlruSegmentRequest:
|
||||
{
|
||||
NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg;
|
||||
|
||||
pq_sendbyte(&s, msg_req->req.latest);
|
||||
pq_sendint64(&s, msg_req->req.lsn);
|
||||
pq_sendbyte(&s, msg_req->kind);
|
||||
pq_sendint32(&s, msg_req->segno);
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
/* pagestore -> pagestore_client. We never need to create these. */
|
||||
case T_NeonExistsResponse:
|
||||
case T_NeonNblocksResponse:
|
||||
case T_NeonGetPageResponse:
|
||||
case T_NeonErrorResponse:
|
||||
case T_NeonDbSizeResponse:
|
||||
case T_NeonGetSlruSegmentResponse:
|
||||
default:
|
||||
elog(ERROR, "unexpected neon message tag 0x%02x", msg->tag);
|
||||
neon_log(ERROR, "unexpected neon message tag 0x%02x", msg->tag);
|
||||
break;
|
||||
}
|
||||
return s;
|
||||
@@ -1071,6 +1139,20 @@ nm_unpack_response(StringInfo s)
|
||||
break;
|
||||
}
|
||||
|
||||
case T_NeonGetSlruSegmentResponse:
|
||||
{
|
||||
NeonGetSlruSegmentResponse *msg_resp;
|
||||
int n_blocks = pq_getmsgint(s, 4);
|
||||
msg_resp = palloc(sizeof(NeonGetSlruSegmentResponse));
|
||||
msg_resp->tag = tag;
|
||||
msg_resp->n_blocks = n_blocks;
|
||||
memcpy(msg_resp->data, pq_getmsgbytes(s, n_blocks * BLCKSZ), n_blocks * BLCKSZ);
|
||||
pq_getmsgend(s);
|
||||
|
||||
resp = (NeonResponse *) msg_resp;
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
* pagestore_client -> pagestore
|
||||
*
|
||||
@@ -1080,8 +1162,9 @@ nm_unpack_response(StringInfo s)
|
||||
case T_NeonNblocksRequest:
|
||||
case T_NeonGetPageRequest:
|
||||
case T_NeonDbSizeRequest:
|
||||
case T_NeonGetSlruSegmentRequest:
|
||||
default:
|
||||
elog(ERROR, "unexpected neon message tag 0x%02x", tag);
|
||||
neon_log(ERROR, "unexpected neon message tag 0x%02x", tag);
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -1149,7 +1232,18 @@ nm_to_string(NeonMessage *msg)
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
case T_NeonGetSlruSegmentRequest:
|
||||
{
|
||||
NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg;
|
||||
|
||||
appendStringInfoString(&s, "{\"type\": \"NeonGetSlruSegmentRequest\"");
|
||||
appendStringInfo(&s, ", \"kind\": %u", msg_req->kind);
|
||||
appendStringInfo(&s, ", \"segno\": %u", msg_req->segno);
|
||||
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
||||
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
/* pagestore -> pagestore_client */
|
||||
case T_NeonExistsResponse:
|
||||
{
|
||||
@@ -1203,6 +1297,17 @@ nm_to_string(NeonMessage *msg)
|
||||
msg_resp->db_size);
|
||||
appendStringInfoChar(&s, '}');
|
||||
|
||||
break;
|
||||
}
|
||||
case T_NeonGetSlruSegmentResponse:
|
||||
{
|
||||
NeonGetSlruSegmentResponse *msg_resp = (NeonGetSlruSegmentResponse *) msg;
|
||||
|
||||
appendStringInfoString(&s, "{\"type\": \"NeonGetSlruSegmentResponse\"");
|
||||
appendStringInfo(&s, ", \"n_blocks\": %u}",
|
||||
msg_resp->n_blocks);
|
||||
appendStringInfoChar(&s, '}');
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -1273,7 +1378,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co
|
||||
XLogFlush(recptr);
|
||||
lsn = recptr;
|
||||
ereport(SmgrTrace,
|
||||
(errmsg("Page %u of relation %u/%u/%u.%u was force logged. Evicted at lsn=%X/%X",
|
||||
(errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u was force logged. Evicted at lsn=%X/%X",
|
||||
blocknum,
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forknum, LSN_FORMAT_ARGS(lsn))));
|
||||
@@ -1301,7 +1406,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co
|
||||
if (PageIsNew((Page) buffer))
|
||||
{
|
||||
ereport(SmgrTrace,
|
||||
(errmsg("Page %u of relation %u/%u/%u.%u is all-zeros",
|
||||
(errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u is all-zeros",
|
||||
blocknum,
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forknum)));
|
||||
@@ -1309,7 +1414,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co
|
||||
else if (PageIsEmptyHeapPage((Page) buffer))
|
||||
{
|
||||
ereport(SmgrTrace,
|
||||
(errmsg("Page %u of relation %u/%u/%u.%u is an empty heap page with no LSN",
|
||||
(errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u is an empty heap page with no LSN",
|
||||
blocknum,
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forknum)));
|
||||
@@ -1317,7 +1422,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co
|
||||
else
|
||||
{
|
||||
ereport(PANIC,
|
||||
(errmsg("Page %u of relation %u/%u/%u.%u is evicted with zero LSN",
|
||||
(errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u is evicted with zero LSN",
|
||||
blocknum,
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forknum)));
|
||||
@@ -1326,7 +1431,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co
|
||||
else
|
||||
{
|
||||
ereport(SmgrTrace,
|
||||
(errmsg("Page %u of relation %u/%u/%u.%u is already wal logged at lsn=%X/%X",
|
||||
(errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u is already wal logged at lsn=%X/%X",
|
||||
blocknum,
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forknum, LSN_FORMAT_ARGS(lsn))));
|
||||
@@ -1423,7 +1528,7 @@ neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, Block
|
||||
lsn = GetLastWrittenLSN(rinfo, forknum, blkno);
|
||||
lsn = nm_adjust_lsn(lsn);
|
||||
|
||||
elog(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ",
|
||||
neon_log(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ",
|
||||
(uint32) ((lsn) >> 32), (uint32) (lsn));
|
||||
}
|
||||
else
|
||||
@@ -1438,7 +1543,7 @@ neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, Block
|
||||
*latest = true;
|
||||
lsn = GetLastWrittenLSN(rinfo, forknum, blkno);
|
||||
Assert(lsn != InvalidXLogRecPtr);
|
||||
elog(DEBUG1, "neon_get_request_lsn GetLastWrittenLSN lsn %X/%X ",
|
||||
neon_log(DEBUG1, "neon_get_request_lsn GetLastWrittenLSN lsn %X/%X ",
|
||||
(uint32) ((lsn) >> 32), (uint32) (lsn));
|
||||
|
||||
lsn = nm_adjust_lsn(lsn);
|
||||
@@ -1458,7 +1563,7 @@ neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, Block
|
||||
#endif
|
||||
if (lsn > flushlsn)
|
||||
{
|
||||
elog(DEBUG5, "last-written LSN %X/%X is ahead of last flushed LSN %X/%X",
|
||||
neon_log(DEBUG5, "last-written LSN %X/%X is ahead of last flushed LSN %X/%X",
|
||||
(uint32) (lsn >> 32), (uint32) lsn,
|
||||
(uint32) (flushlsn >> 32), (uint32) flushlsn);
|
||||
XLogFlush(lsn);
|
||||
@@ -1502,7 +1607,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
|
||||
return mdexists(reln, forkNum);
|
||||
|
||||
default:
|
||||
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
}
|
||||
|
||||
if (get_cached_relsize(InfoFromSMgrRel(reln), forkNum, &n_blocks))
|
||||
@@ -1554,7 +1659,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
|
||||
case T_NeonErrorResponse:
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_IO_ERROR),
|
||||
errmsg("could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X",
|
||||
errmsg(NEON_TAG "could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X",
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forkNum,
|
||||
(uint32) (request_lsn >> 32), (uint32) request_lsn),
|
||||
@@ -1563,7 +1668,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
|
||||
break;
|
||||
|
||||
default:
|
||||
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
|
||||
}
|
||||
pfree(resp);
|
||||
return exists;
|
||||
@@ -1580,7 +1685,7 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
|
||||
switch (reln->smgr_relpersistence)
|
||||
{
|
||||
case 0:
|
||||
elog(ERROR, "cannot call smgrcreate() on rel with unknown persistence");
|
||||
neon_log(ERROR, "cannot call smgrcreate() on rel with unknown persistence");
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
break;
|
||||
@@ -1591,10 +1696,10 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
|
||||
return;
|
||||
|
||||
default:
|
||||
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
}
|
||||
|
||||
elog(SmgrTrace, "Create relation %u/%u/%u.%u",
|
||||
neon_log(SmgrTrace, "Create relation %u/%u/%u.%u",
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forkNum);
|
||||
|
||||
@@ -1689,7 +1794,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
|
||||
switch (reln->smgr_relpersistence)
|
||||
{
|
||||
case 0:
|
||||
elog(ERROR, "cannot call smgrextend() on rel with unknown persistence");
|
||||
neon_log(ERROR, "cannot call smgrextend() on rel with unknown persistence");
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
break;
|
||||
@@ -1700,7 +1805,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
|
||||
return;
|
||||
|
||||
default:
|
||||
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1719,7 +1824,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
|
||||
if (current_size >= ((uint64) max_cluster_size) * 1024 * 1024)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_DISK_FULL),
|
||||
errmsg("could not extend file because project size limit (%d MB) has been exceeded",
|
||||
errmsg(NEON_TAG "could not extend file because project size limit (%d MB) has been exceeded",
|
||||
max_cluster_size),
|
||||
errhint("This limit is defined externally by the project size limit, and internally by neon.max_cluster_size GUC")));
|
||||
}
|
||||
@@ -1738,7 +1843,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
|
||||
set_cached_relsize(InfoFromSMgrRel(reln), forkNum, blkno + 1);
|
||||
|
||||
lsn = PageGetLSN((Page) buffer);
|
||||
elog(SmgrTrace, "smgrextend called for %u/%u/%u.%u blk %u, page LSN: %X/%08X",
|
||||
neon_log(SmgrTrace, "smgrextend called for %u/%u/%u.%u blk %u, page LSN: %X/%08X",
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forkNum, blkno,
|
||||
(uint32) (lsn >> 32), (uint32) lsn);
|
||||
@@ -1778,7 +1883,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
|
||||
switch (reln->smgr_relpersistence)
|
||||
{
|
||||
case 0:
|
||||
elog(ERROR, "cannot call smgrextend() on rel with unknown persistence");
|
||||
neon_log(ERROR, "cannot call smgrextend() on rel with unknown persistence");
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
break;
|
||||
@@ -1789,7 +1894,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
|
||||
return;
|
||||
|
||||
default:
|
||||
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
}
|
||||
|
||||
if (max_cluster_size > 0 &&
|
||||
@@ -1801,7 +1906,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
|
||||
if (current_size >= ((uint64) max_cluster_size) * 1024 * 1024)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_DISK_FULL),
|
||||
errmsg("could not extend file because cluster size limit (%d MB) has been exceeded",
|
||||
errmsg(NEON_TAG "could not extend file because cluster size limit (%d MB) has been exceeded",
|
||||
max_cluster_size),
|
||||
errhint("This limit is defined by neon.max_cluster_size GUC")));
|
||||
}
|
||||
@@ -1814,7 +1919,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
|
||||
if ((uint64) blocknum + nblocks >= (uint64) InvalidBlockNumber)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
|
||||
errmsg("cannot extend file \"%s\" beyond %u blocks",
|
||||
errmsg(NEON_TAG "cannot extend file \"%s\" beyond %u blocks",
|
||||
relpath(reln->smgr_rlocator, forkNum),
|
||||
InvalidBlockNumber)));
|
||||
|
||||
@@ -1875,7 +1980,7 @@ neon_open(SMgrRelation reln)
|
||||
mdopen(reln);
|
||||
|
||||
/* no work */
|
||||
elog(SmgrTrace, "[NEON_SMGR] open noop");
|
||||
neon_log(SmgrTrace, "open noop");
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1912,7 +2017,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
|
||||
return mdprefetch(reln, forknum, blocknum);
|
||||
|
||||
default:
|
||||
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
}
|
||||
|
||||
if (lfc_cache_contains(InfoFromSMgrRel(reln), forknum, blocknum))
|
||||
@@ -1957,11 +2062,11 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
|
||||
return;
|
||||
|
||||
default:
|
||||
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
}
|
||||
|
||||
/* not implemented */
|
||||
elog(SmgrTrace, "[NEON_SMGR] writeback noop");
|
||||
neon_log(SmgrTrace, "writeback noop");
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (IS_LOCAL_REL(reln))
|
||||
@@ -2091,8 +2196,8 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
case T_NeonErrorResponse:
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_IO_ERROR),
|
||||
errmsg("could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X",
|
||||
blkno,
|
||||
errmsg(NEON_TAG "[shard %d] could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X",
|
||||
slot->shard_no, blkno,
|
||||
RelFileInfoFmt(rinfo),
|
||||
forkNum,
|
||||
(uint32) (request_lsn >> 32), (uint32) request_lsn),
|
||||
@@ -2100,7 +2205,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
((NeonErrorResponse *) resp)->message)));
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
|
||||
}
|
||||
|
||||
/* buffer was used, clean up for later reuse */
|
||||
@@ -2124,7 +2229,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
switch (reln->smgr_relpersistence)
|
||||
{
|
||||
case 0:
|
||||
elog(ERROR, "cannot call smgrread() on rel with unknown persistence");
|
||||
neon_log(ERROR, "cannot call smgrread() on rel with unknown persistence");
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
break;
|
||||
@@ -2135,7 +2240,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
return;
|
||||
|
||||
default:
|
||||
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
}
|
||||
|
||||
/* Try to read from local file cache */
|
||||
@@ -2163,7 +2268,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
{
|
||||
if (!PageIsNew((Page) pageserver_masked))
|
||||
{
|
||||
elog(PANIC, "page is new in MD but not in Page Server at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n",
|
||||
neon_log(PANIC, "page is new in MD but not in Page Server at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n",
|
||||
blkno,
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forkNum,
|
||||
@@ -2173,7 +2278,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
}
|
||||
else if (PageIsNew((Page) buffer))
|
||||
{
|
||||
elog(PANIC, "page is new in Page Server but not in MD at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n",
|
||||
neon_log(PANIC, "page is new in Page Server but not in MD at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n",
|
||||
blkno,
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forkNum,
|
||||
@@ -2188,7 +2293,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
|
||||
if (memcmp(mdbuf_masked, pageserver_masked, BLCKSZ) != 0)
|
||||
{
|
||||
elog(PANIC, "heap buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n",
|
||||
neon_log(PANIC, "heap buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n",
|
||||
blkno,
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forkNum,
|
||||
@@ -2207,7 +2312,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
|
||||
if (memcmp(mdbuf_masked, pageserver_masked, BLCKSZ) != 0)
|
||||
{
|
||||
elog(PANIC, "btree buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n",
|
||||
neon_log(PANIC, "btree buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n",
|
||||
blkno,
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forkNum,
|
||||
@@ -2287,13 +2392,13 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
|
||||
return;
|
||||
|
||||
default:
|
||||
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
}
|
||||
|
||||
neon_wallog_page(reln, forknum, blocknum, buffer, false);
|
||||
|
||||
lsn = PageGetLSN((Page) buffer);
|
||||
elog(SmgrTrace, "smgrwrite called for %u/%u/%u.%u blk %u, page LSN: %X/%08X",
|
||||
neon_log(SmgrTrace, "smgrwrite called for %u/%u/%u.%u blk %u, page LSN: %X/%08X",
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forknum, blocknum,
|
||||
(uint32) (lsn >> 32), (uint32) lsn);
|
||||
@@ -2320,7 +2425,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
switch (reln->smgr_relpersistence)
|
||||
{
|
||||
case 0:
|
||||
elog(ERROR, "cannot call smgrnblocks() on rel with unknown persistence");
|
||||
neon_log(ERROR, "cannot call smgrnblocks() on rel with unknown persistence");
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
@@ -2331,12 +2436,12 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
return mdnblocks(reln, forknum);
|
||||
|
||||
default:
|
||||
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
}
|
||||
|
||||
if (get_cached_relsize(InfoFromSMgrRel(reln), forknum, &n_blocks))
|
||||
{
|
||||
elog(SmgrTrace, "cached nblocks for %u/%u/%u.%u: %u blocks",
|
||||
neon_log(SmgrTrace, "cached nblocks for %u/%u/%u.%u: %u blocks",
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forknum, n_blocks);
|
||||
return n_blocks;
|
||||
@@ -2364,7 +2469,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
case T_NeonErrorResponse:
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_IO_ERROR),
|
||||
errmsg("could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X",
|
||||
errmsg(NEON_TAG "could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X",
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forknum,
|
||||
(uint32) (request_lsn >> 32), (uint32) request_lsn),
|
||||
@@ -2373,11 +2478,11 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
break;
|
||||
|
||||
default:
|
||||
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
|
||||
}
|
||||
update_cached_relsize(InfoFromSMgrRel(reln), forknum, n_blocks);
|
||||
|
||||
elog(SmgrTrace, "neon_nblocks: rel %u/%u/%u fork %u (request LSN %X/%08X): %u blocks",
|
||||
neon_log(SmgrTrace, "neon_nblocks: rel %u/%u/%u fork %u (request LSN %X/%08X): %u blocks",
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forknum,
|
||||
(uint32) (request_lsn >> 32), (uint32) request_lsn,
|
||||
@@ -2420,7 +2525,7 @@ neon_dbsize(Oid dbNode)
|
||||
case T_NeonErrorResponse:
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_IO_ERROR),
|
||||
errmsg("could not read db size of db %u from page server at lsn %X/%08X",
|
||||
errmsg(NEON_TAG "could not read db size of db %u from page server at lsn %X/%08X",
|
||||
dbNode,
|
||||
(uint32) (request_lsn >> 32), (uint32) request_lsn),
|
||||
errdetail("page server returned error: %s",
|
||||
@@ -2428,10 +2533,10 @@ neon_dbsize(Oid dbNode)
|
||||
break;
|
||||
|
||||
default:
|
||||
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
|
||||
}
|
||||
|
||||
elog(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes",
|
||||
neon_log(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes",
|
||||
dbNode,
|
||||
(uint32) (request_lsn >> 32), (uint32) request_lsn,
|
||||
db_size);
|
||||
@@ -2451,7 +2556,7 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
|
||||
switch (reln->smgr_relpersistence)
|
||||
{
|
||||
case 0:
|
||||
elog(ERROR, "cannot call smgrtruncate() on rel with unknown persistence");
|
||||
neon_log(ERROR, "cannot call smgrtruncate() on rel with unknown persistence");
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
@@ -2463,7 +2568,7 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
|
||||
return;
|
||||
|
||||
default:
|
||||
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
}
|
||||
|
||||
set_cached_relsize(InfoFromSMgrRel(reln), forknum, nblocks);
|
||||
@@ -2519,7 +2624,7 @@ neon_immedsync(SMgrRelation reln, ForkNumber forknum)
|
||||
switch (reln->smgr_relpersistence)
|
||||
{
|
||||
case 0:
|
||||
elog(ERROR, "cannot call smgrimmedsync() on rel with unknown persistence");
|
||||
neon_log(ERROR, "cannot call smgrimmedsync() on rel with unknown persistence");
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
@@ -2531,10 +2636,10 @@ neon_immedsync(SMgrRelation reln, ForkNumber forknum)
|
||||
return;
|
||||
|
||||
default:
|
||||
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
}
|
||||
|
||||
elog(SmgrTrace, "[NEON_SMGR] immedsync noop");
|
||||
neon_log(SmgrTrace, "[NEON_SMGR] immedsync noop");
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (IS_LOCAL_REL(reln))
|
||||
@@ -2559,17 +2664,17 @@ neon_start_unlogged_build(SMgrRelation reln)
|
||||
* progress at a time. That's enough for the current usage.
|
||||
*/
|
||||
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_IN_PROGRESS)
|
||||
elog(ERROR, "unlogged relation build is already in progress");
|
||||
neon_log(ERROR, "unlogged relation build is already in progress");
|
||||
Assert(unlogged_build_rel == NULL);
|
||||
|
||||
ereport(SmgrTrace,
|
||||
(errmsg("starting unlogged build of relation %u/%u/%u",
|
||||
(errmsg(NEON_TAG "starting unlogged build of relation %u/%u/%u",
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)))));
|
||||
|
||||
switch (reln->smgr_relpersistence)
|
||||
{
|
||||
case 0:
|
||||
elog(ERROR, "cannot call smgr_start_unlogged_build() on rel with unknown persistence");
|
||||
neon_log(ERROR, "cannot call smgr_start_unlogged_build() on rel with unknown persistence");
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
@@ -2582,11 +2687,11 @@ neon_start_unlogged_build(SMgrRelation reln)
|
||||
return;
|
||||
|
||||
default:
|
||||
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
}
|
||||
|
||||
if (smgrnblocks(reln, MAIN_FORKNUM) != 0)
|
||||
elog(ERROR, "cannot perform unlogged index build, index is not empty ");
|
||||
neon_log(ERROR, "cannot perform unlogged index build, index is not empty ");
|
||||
|
||||
unlogged_build_rel = reln;
|
||||
unlogged_build_phase = UNLOGGED_BUILD_PHASE_1;
|
||||
@@ -2613,7 +2718,7 @@ neon_finish_unlogged_build_phase_1(SMgrRelation reln)
|
||||
Assert(unlogged_build_rel == reln);
|
||||
|
||||
ereport(SmgrTrace,
|
||||
(errmsg("finishing phase 1 of unlogged build of relation %u/%u/%u",
|
||||
(errmsg(NEON_TAG "finishing phase 1 of unlogged build of relation %u/%u/%u",
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)))));
|
||||
|
||||
if (unlogged_build_phase == UNLOGGED_BUILD_NOT_PERMANENT)
|
||||
@@ -2642,7 +2747,7 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
Assert(unlogged_build_rel == reln);
|
||||
|
||||
ereport(SmgrTrace,
|
||||
(errmsg("ending unlogged build of relation %u/%u/%u",
|
||||
(errmsg(NEON_TAG "ending unlogged build of relation %u/%u/%u",
|
||||
RelFileInfoFmt(InfoFromNInfoB(rinfob)))));
|
||||
|
||||
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_PERMANENT)
|
||||
@@ -2657,7 +2762,7 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
rinfob = InfoBFromSMgrRel(reln);
|
||||
for (int forknum = 0; forknum <= MAX_FORKNUM; forknum++)
|
||||
{
|
||||
elog(SmgrTrace, "forgetting cached relsize for %u/%u/%u.%u",
|
||||
neon_log(SmgrTrace, "forgetting cached relsize for %u/%u/%u.%u",
|
||||
RelFileInfoFmt(InfoFromNInfoB(rinfob)),
|
||||
forknum);
|
||||
|
||||
@@ -2672,6 +2777,61 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
}
|
||||
|
||||
static int
|
||||
neon_read_slru_segment(SMgrRelation reln, SlruKind kind, int segno, void* buffer)
|
||||
{
|
||||
XLogRecPtr request_lsn;
|
||||
/* TODO: any better alternative than flush LSN? Actually we to request SLRU at basebackup creation time... */
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
request_lsn = GetFlushRecPtr(NULL);
|
||||
#else
|
||||
request_lsn = GetFlushRecPtr();
|
||||
#endif
|
||||
NeonResponse *resp;
|
||||
shardno_t shard_no = 0; /* SLRU are at the zero shard */
|
||||
NeonGetSlruSegmentRequest request = {
|
||||
.req.tag = T_NeonGetSlruSegmentRequest,
|
||||
.req.latest = false,
|
||||
.req.lsn = request_lsn,
|
||||
|
||||
.kind = kind,
|
||||
.segno = segno
|
||||
};
|
||||
int n_blocks;
|
||||
|
||||
do
|
||||
{
|
||||
while (!page_server->send(shard_no, &request.req) || !page_server->flush(shard_no));
|
||||
consume_prefetch_responses();
|
||||
resp = page_server->receive(shard_no);
|
||||
} while (resp == NULL);
|
||||
|
||||
switch (resp->tag)
|
||||
{
|
||||
case T_NeonGetSlruSegmentResponse:
|
||||
n_blocks = ((NeonGetSlruSegmentResponse *) resp)->n_blocks;
|
||||
memcpy(buffer, ((NeonGetSlruSegmentResponse *) resp)->data, n_blocks*BLCKSZ);
|
||||
break;
|
||||
|
||||
case T_NeonErrorResponse:
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_IO_ERROR),
|
||||
errmsg(NEON_TAG "could not read SLRU %d segment %d at lsn %X/%08X",
|
||||
kind,
|
||||
segno,
|
||||
(uint32) (request_lsn >> 32), (uint32) request_lsn),
|
||||
errdetail("page server returned error: %s",
|
||||
((NeonErrorResponse *) resp)->message)));
|
||||
break;
|
||||
|
||||
default:
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
|
||||
}
|
||||
pfree(resp);
|
||||
|
||||
return n_blocks;
|
||||
}
|
||||
|
||||
static void
|
||||
AtEOXact_neon(XactEvent event, void *arg)
|
||||
{
|
||||
@@ -2700,7 +2860,7 @@ AtEOXact_neon(XactEvent event, void *arg)
|
||||
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INTERNAL_ERROR),
|
||||
(errmsg("unlogged index build was not properly finished"))));
|
||||
(errmsg(NEON_TAG "unlogged index build was not properly finished"))));
|
||||
}
|
||||
break;
|
||||
}
|
||||
@@ -2730,6 +2890,8 @@ static const struct f_smgr neon_smgr =
|
||||
.smgr_start_unlogged_build = neon_start_unlogged_build,
|
||||
.smgr_finish_unlogged_build_phase_1 = neon_finish_unlogged_build_phase_1,
|
||||
.smgr_end_unlogged_build = neon_end_unlogged_build,
|
||||
|
||||
.smgr_read_slru_segment = neon_read_slru_segment,
|
||||
};
|
||||
|
||||
const f_smgr *
|
||||
@@ -2799,7 +2961,7 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
set_cached_relsize(rinfo, forknum, relsize);
|
||||
SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);
|
||||
|
||||
elog(SmgrTrace, "Set length to %d", relsize);
|
||||
neon_log(SmgrTrace, "Set length to %d", relsize);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2887,7 +3049,7 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
|
||||
|
||||
#if PG_VERSION_NUM < 150000
|
||||
if (!XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno))
|
||||
elog(PANIC, "failed to locate backup block with ID %d", block_id);
|
||||
neon_log(PANIC, "failed to locate backup block with ID %d", block_id);
|
||||
#else
|
||||
XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno);
|
||||
#endif
|
||||
|
||||
@@ -60,6 +60,7 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
|
||||
match msg {
|
||||
PagestreamFeMessage::Exists(_) => {}
|
||||
PagestreamFeMessage::Nblocks(_) => {}
|
||||
PagestreamFeMessage::GetSlruSegment(_) => {}
|
||||
PagestreamFeMessage::GetPage(req) => {
|
||||
total += 1;
|
||||
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: dd067cf656...3b28a69827
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: bc88f53931...7a9d31fd82
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: e3a22b7292...ce3b15942c
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"postgres-v16": "e3a22b72922055f9212eca12700190f118578362",
|
||||
"postgres-v15": "bc88f539312fcc4bb292ce94ae9db09ab6656e8a",
|
||||
"postgres-v14": "dd067cf656f6810a25aca6025633d32d02c5085a"
|
||||
"postgres-v16": "ce3b15942c91adec8e83a43d2cb713038f2fcf53",
|
||||
"postgres-v15": "7a9d31fd826d251b7f62f1f83808bcd00c5ef554",
|
||||
"postgres-v14": "3b28a698276dd17aadd883adc8e0a9ff0f87be0f"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user