Fetches the SLRU segment via the new communicator.

The fetch is done not into a buffer as earlier, but directly into the
file.
This commit is contained in:
Victor Polevoy
2025-07-10 11:02:32 +02:00
parent 10a7d49726
commit cb50291dcd
23 changed files with 275 additions and 82 deletions

View File

@@ -14,6 +14,7 @@
//! in-place and are at a high level achieved by expanding/reducing the bucket array and rebuilding the
//! dictionary by rehashing all keys.
use std::fmt::Debug;
use std::hash::{BuildHasher, Hash};
use std::mem::MaybeUninit;
@@ -41,6 +42,22 @@ pub struct HashMapInit<'a, K, V, S = rustc_hash::FxBuildHasher> {
num_buckets: u32,
}
impl<'a, K, V, S> Debug for HashMapInit<'a, K, V, S>
where
K: Debug,
V: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HashMapInit")
.field("shmem_handle", &self.shmem_handle)
.field("shared_ptr", &self.shared_ptr)
.field("shared_size", &self.shared_size)
// .field("hasher", &self.hasher)
.field("num_buckets", &self.num_buckets)
.finish()
}
}
/// This is a per-process handle to a hash table that (possibly) lives in shared memory.
/// If a child process is launched with fork(), the child process should
/// get its own HashMapAccess by calling HashMapInit::attach_writer/reader().
@@ -56,6 +73,20 @@ pub struct HashMapAccess<'a, K, V, S = rustc_hash::FxBuildHasher> {
unsafe impl<K: Sync, V: Sync, S> Sync for HashMapAccess<'_, K, V, S> {}
unsafe impl<K: Send, V: Send, S> Send for HashMapAccess<'_, K, V, S> {}
impl<'a, K, V, S> Debug for HashMapAccess<'a, K, V, S>
where
K: Debug,
V: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HashMapAccess")
.field("shmem_handle", &self.shmem_handle)
.field("shared_ptr", &self.shared_ptr)
// .field("hasher", &self.hasher)
.finish()
}
}
impl<'a, K: Clone + Hash + Eq, V, S> HashMapInit<'a, K, V, S> {
/// Change the 'hasher' used by the hash table.
///

View File

@@ -1,5 +1,6 @@
//! Simple hash table with chaining.
use std::fmt::Debug;
use std::hash::Hash;
use std::mem::MaybeUninit;
@@ -17,6 +18,19 @@ pub(crate) struct Bucket<K, V> {
pub(crate) inner: Option<(K, V)>,
}
impl<K, V> Debug for Bucket<K, V>
where
K: Debug,
V: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Bucket")
.field("next", &self.next)
.field("inner", &self.inner)
.finish()
}
}
/// Core hash table implementation.
pub(crate) struct CoreHashMap<'a, K, V> {
/// Dictionary used to map hashes to bucket indices.
@@ -34,6 +48,22 @@ pub(crate) struct CoreHashMap<'a, K, V> {
pub(crate) _user_list_head: u32,
}
impl<'a, K, V> Debug for CoreHashMap<'a, K, V>
where
K: Debug,
V: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CoreHashMap")
.field("dictionary", &self.dictionary)
.field("buckets", &self.buckets)
.field("free_head", &self.free_head)
.field("alloc_limit", &self.alloc_limit)
.field("buckets_in_use", &self.buckets_in_use)
.finish()
}
}
/// Error for when there are no empty buckets left but one is needed.
#[derive(Debug, PartialEq)]
pub struct FullError();

View File

@@ -21,6 +21,7 @@ use nix::unistd::ftruncate as nix_ftruncate;
/// the underlying file is resized. Do not access the area beyond the current size. Currently, that
/// will cause the file to be expanded, but we might use `mprotect()` etc. to enforce that in the
/// future.
#[derive(Debug)]
pub struct ShmemHandle {
/// memfd file descriptor
fd: OwnedFd,
@@ -35,6 +36,7 @@ pub struct ShmemHandle {
}
/// This is stored at the beginning in the shared memory area.
#[derive(Debug)]
struct SharedStruct {
max_size: usize,

View File

@@ -22,6 +22,7 @@ pub type CacheBlock = u64;
pub const INVALID_CACHE_BLOCK: CacheBlock = u64::MAX;
#[derive(Debug)]
pub struct FileCache {
file: Arc<File>,
@@ -35,6 +36,7 @@ pub struct FileCache {
// TODO: We keep track of all free blocks in this vec. That doesn't really scale.
// Idea: when free_blocks fills up with more than 1024 entries, write them all to
// one block on disk.
#[derive(Debug)]
struct FreeList {
next_free_block: CacheBlock,
max_blocks: u64,

View File

@@ -46,6 +46,7 @@ pub struct IntegratedCacheInitStruct<'t> {
}
/// Represents write-access to the integrated cache. This is used by the communicator process.
#[derive(Debug)]
pub struct IntegratedCacheWriteAccess<'t> {
relsize_cache: neon_shmem::hash::HashMapAccess<'t, RelKey, RelEntry>,
block_map: neon_shmem::hash::HashMapAccess<'t, BlockKey, BlockEntry>,

View File

@@ -21,5 +21,5 @@ mod worker_process;
mod global_allocator;
// FIXME get this from postgres headers somehow
// FIXME: get this from postgres headers somehow
pub const BLCKSZ: usize = 8192;

View File

@@ -15,7 +15,9 @@ pub type COid = u32;
// This conveniently matches PG_IOV_MAX
pub const MAX_GETPAGEV_PAGES: usize = 32;
use pageserver_page_api as page_api;
use std::ffi::CStr;
use pageserver_page_api::{self as page_api, SlruKind};
/// Request from a Postgres backend to the communicator process
#[allow(clippy::large_enum_variant)]
@@ -29,6 +31,7 @@ pub enum NeonIORequest {
RelExists(CRelExistsRequest),
RelSize(CRelSizeRequest),
GetPageV(CGetPageVRequest),
ReadSlruSegment(CReadSlruSegmentRequest),
PrefetchV(CPrefetchVRequest),
DbSize(CDbSizeRequest),
@@ -54,6 +57,9 @@ pub enum NeonIOResult {
/// the result pages are written to the shared memory addresses given in the request
GetPageV,
/// The result is written to the file, path to which is provided
/// in the request. The [`u64`] value here is the number of blocks.
ReadSlruSegment(u64),
/// A prefetch request returns as soon as the request has been received by the communicator.
/// It is processed in the background.
@@ -83,6 +89,7 @@ impl NeonIORequest {
RelExists(req) => req.request_id,
RelSize(req) => req.request_id,
GetPageV(req) => req.request_id,
ReadSlruSegment(req) => req.request_id,
PrefetchV(req) => req.request_id,
DbSize(req) => req.request_id,
WritePage(req) => req.request_id,
@@ -193,6 +200,28 @@ pub struct CGetPageVRequest {
pub dest: [ShmemBuf; MAX_GETPAGEV_PAGES],
}
#[repr(C)]
#[derive(Copy, Clone, Debug)]
pub struct CReadSlruSegmentRequest {
pub request_id: u64,
pub slru_kind: SlruKind,
pub segment_number: u32,
pub request_lsn: CLsn,
/// Must be a null-terminated C string containing the file path
/// where the communicator will write the SLRU segment.
pub destination_file_path: ShmemBuf,
}
impl CReadSlruSegmentRequest {
/// Returns the file path where the communicator will write the
/// SLRU segment.
pub(crate) fn destination_file_path(&self) -> String {
unsafe { CStr::from_ptr(self.destination_file_path.as_mut_ptr() as *const _) }
.to_string_lossy()
.into_owned()
}
}
#[repr(C)]
#[derive(Copy, Clone, Debug)]
pub struct CPrefetchVRequest {

View File

@@ -24,7 +24,7 @@ use utils::id::{TenantId, TimelineId};
use super::callbacks::{get_request_lsn, notify_proc};
use tracing::{error, info, info_span, trace};
use tracing::{debug, error, info, info_span, trace};
use utils::lsn::Lsn;
@@ -58,6 +58,7 @@ pub struct CommunicatorWorkerProcessStruct<'a> {
request_rel_exists_counter: IntCounter,
request_rel_size_counter: IntCounter,
request_get_pagev_counter: IntCounter,
request_read_slru_segment_counter: IntCounter,
request_prefetchv_counter: IntCounter,
request_db_size_counter: IntCounter,
request_write_page_counter: IntCounter,
@@ -106,6 +107,8 @@ pub(super) async fn init(
.integrated_cache_init_struct
.worker_process_init(last_lsn, file_cache);
debug!("Initialised integrated cache: {cache:?}");
let tenant_id = TenantId::from_str(&tenant_id).expect("invalid tenant ID");
let timeline_id = TimelineId::from_str(&timeline_id).expect("invalid timeline ID");
let shard_spec = ShardSpec::new(shard_map, stripe_size).expect("invalid shard spec");
@@ -123,6 +126,8 @@ pub(super) async fn init(
let request_rel_exists_counter = request_counters.with_label_values(&["rel_exists"]);
let request_rel_size_counter = request_counters.with_label_values(&["rel_size"]);
let request_get_pagev_counter = request_counters.with_label_values(&["get_pagev"]);
let request_read_slru_segment_counter =
request_counters.with_label_values(&["read_slru_segment"]);
let request_prefetchv_counter = request_counters.with_label_values(&["prefetchv"]);
let request_db_size_counter = request_counters.with_label_values(&["db_size"]);
let request_write_page_counter = request_counters.with_label_values(&["write_page"]);
@@ -173,6 +178,7 @@ pub(super) async fn init(
request_rel_exists_counter,
request_rel_size_counter,
request_get_pagev_counter,
request_read_slru_segment_counter,
request_prefetchv_counter,
request_db_size_counter,
request_write_page_counter,
@@ -418,6 +424,36 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
Err(errno) => NeonIOResult::Error(errno),
}
}
NeonIORequest::ReadSlruSegment(req) => {
self.request_read_slru_segment_counter.inc();
let lsn = Lsn(req.request_lsn);
let file_path = req.destination_file_path();
match self
.client
.get_slru_segment(page_api::GetSlruSegmentRequest {
read_lsn: self.request_lsns(lsn),
kind: req.slru_kind,
segno: req.segment_number,
})
.await
{
Ok(slru_bytes) => {
if let Err(e) = tokio::fs::write(&file_path, &slru_bytes).await {
info!("could not write slru segment to file {file_path}: {e}");
return NeonIOResult::Error(e.raw_os_error().unwrap_or(libc::EIO));
}
let blocks_count = slru_bytes.len() / crate::BLCKSZ;
NeonIOResult::ReadSlruSegment(blocks_count as _)
}
Err(err) => {
info!("tonic error: {err:?}");
NeonIOResult::Error(0)
}
}
}
NeonIORequest::PrefetchV(req) => {
self.request_prefetchv_counter.inc();
self.request_prefetchv_nblocks_counter

View File

@@ -997,10 +997,58 @@ communicator_new_dbsize(Oid dbNode)
}
int
communicator_new_read_slru_segment(SlruKind kind, int64 segno, void *buffer)
communicator_new_read_slru_segment(
SlruKind kind,
uint32_t segno,
neon_request_lsns *request_lsns,
const char* path)
{
/* TODO */
elog(ERROR, "not implemented");
NeonIOResult result = {};
NeonIORequest request = {
.tag = NeonIORequest_ReadSlruSegment,
.read_slru_segment = {
.request_id = assign_request_id(),
.slru_kind = kind,
.segment_number = segno,
.request_lsn = request_lsns->request_lsn,
}
};
int nblocks = -1;
char *temp_path = bounce_buf();
if (path == NULL) {
elog(ERROR, "read_slru_segment called with NULL path");
return -1;
}
strlcpy(temp_path, path, BLCKSZ);
request.read_slru_segment.destination_file_path.ptr = (uint8_t *) temp_path;
elog(DEBUG5, "readslrusegment called for kind=%u, segno=%u, file_path=\"%s\"",
kind, segno, request.read_slru_segment.destination_file_path.ptr);
/* FIXME: see `request_lsns` in main_loop.rs for why this is needed */
XLogSetAsyncXactLSN(request_lsns->request_lsn);
perform_request(&request, &result);
switch (result.tag)
{
case NeonIOResult_ReadSlruSegment:
nblocks = result.read_slru_segment;
break;
case NeonIOResult_Error:
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read slru segment, kind=%u, segno=%u: %s",
kind, segno, pg_strerror(result.error))));
break;
default:
elog(ERROR, "unexpected result for read SLRU operation: %d", result.tag);
break;
}
return nblocks;
}
/* Write requests */
@@ -1305,6 +1353,18 @@ print_neon_io_request(NeonIORequest *request)
r->spc_oid, r->db_oid, r->rel_number, r->fork_number, r->block_number, r->block_number + r->nblocks);
return buf;
}
case NeonIORequest_ReadSlruSegment:
{
CReadSlruSegmentRequest *r = &request->read_slru_segment;
snprintf(buf, sizeof(buf), "ReadSlruSegment: req " UINT64_FORMAT " slrukind=%u, segno=%u, lsn=%X/%X, file_path=\"%s\"",
r->request_id,
r->slru_kind,
r->segment_number,
LSN_FORMAT_ARGS(r->request_lsn),
r->destination_file_path.ptr);
return buf;
}
case NeonIORequest_PrefetchV:
{
CPrefetchVRequest *r = &request->prefetch_v;

View File

@@ -38,8 +38,12 @@ extern void communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkN
BlockNumber nblocks);
extern bool communicator_new_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blockno);
extern int communicator_new_read_slru_segment(SlruKind kind, int64 segno,
void *buffer);
extern int communicator_new_read_slru_segment(
SlruKind kind,
uint32_t segno,
neon_request_lsns *request_lsns,
const char *path
);
/* Write requests, to keep the caches up-to-date */
extern void communicator_new_write_page(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blockno,

View File

@@ -258,7 +258,7 @@ lfc_switch_off(void)
{
int fd;
Assert(!neon_enable_new_communicator);
Assert(!neon_use_communicator_worker);
if (LFC_ENABLED())
{
@@ -325,7 +325,7 @@ lfc_maybe_disabled(void)
static bool
lfc_ensure_opened(void)
{
Assert(!neon_enable_new_communicator);
Assert(!neon_use_communicator_worker);
if (lfc_generation != lfc_ctl->generation)
{
@@ -352,7 +352,7 @@ lfc_shmem_startup(void)
bool found;
static HASHCTL info;
Assert(!neon_enable_new_communicator);
Assert(!neon_use_communicator_worker);
if (prev_shmem_startup_hook)
{
@@ -652,7 +652,7 @@ lfc_init(void)
if (lfc_max_size == 0)
return;
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
return;
prev_shmem_startup_hook = shmem_startup_hook;
@@ -730,7 +730,7 @@ lfc_prewarm(FileCacheState* fcs, uint32 n_workers)
dsm_segment *seg;
BackgroundWorkerHandle* bgw_handle[MAX_PREWARM_WORKERS];
Assert(!neon_enable_new_communicator);
Assert(!neon_use_communicator_worker);
if (!lfc_ensure_opened())
return;
@@ -885,7 +885,7 @@ lfc_prewarm_main(Datum main_arg)
PrewarmWorkerState* ws;
uint32 worker_id = DatumGetInt32(main_arg);
Assert(!neon_enable_new_communicator);
Assert(!neon_use_communicator_worker);
AmPrewarmWorker = true;
@@ -987,7 +987,7 @@ lfc_invalidate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks)
FileCacheEntry *entry;
uint32 hash;
Assert(!neon_enable_new_communicator);
Assert(!neon_use_communicator_worker);
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return;
@@ -1034,7 +1034,7 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
bool found = false;
uint32 hash;
Assert(!neon_enable_new_communicator);
Assert(!neon_use_communicator_worker);
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return false;
@@ -1071,7 +1071,7 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
uint32 hash;
int i = 0;
Assert(!neon_enable_new_communicator);
Assert(!neon_use_communicator_worker);
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return 0;
@@ -1180,7 +1180,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
int blocks_read = 0;
int buf_offset = 0;
Assert(!neon_enable_new_communicator);
Assert(!neon_use_communicator_worker);
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return -1;
@@ -1547,7 +1547,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
Assert(!neon_enable_new_communicator);
Assert(!neon_use_communicator_worker);
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return false;
@@ -1694,7 +1694,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
uint32 entry_offset;
int buf_offset = 0;
Assert(!neon_enable_new_communicator);
Assert(!neon_use_communicator_worker);
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return;
@@ -2211,7 +2211,7 @@ get_local_cache_state(PG_FUNCTION_ARGS)
size_t max_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0);
FileCacheState* fcs;
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
elog(ERROR, "TODO: not implemented");
fcs = lfc_get_state(max_entries);
@@ -2231,7 +2231,7 @@ prewarm_local_cache(PG_FUNCTION_ARGS)
uint32 n_workers = PG_GETARG_INT32(1);
FileCacheState* fcs;
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
elog(ERROR, "TODO: not implemented");
fcs = (FileCacheState*)state;
@@ -2254,7 +2254,7 @@ get_prewarm_info(PG_FUNCTION_ARGS)
uint32 total_pages;
size_t n_workers;
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
elog(ERROR, "TODO: not implemented");
if (lfc_size_limit == 0)

View File

@@ -123,7 +123,7 @@ static uint64 pagestore_local_counter = 0;
typedef enum PSConnectionState {
PS_Disconnected, /* no connection yet */
PS_Connecting_Startup, /* connection starting up */
PS_Connecting_PageStream, /* negotiating pagestream */
PS_Connecting_PageStream, /* negotiating pagestream */
PS_Connected, /* connected, pagestream established */
} PSConnectionState;
@@ -253,7 +253,7 @@ AssignPageserverConnstring(const char *newval, void *extra)
* In that case, the shard map is loaded from 'neon.pageserver_grpc_urls'
* instead, and that happens in the communicator process only.
*/
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
return;
/*
@@ -395,7 +395,7 @@ get_shard_number(BufferTag *tag)
}
static inline void
CLEANUP_AND_DISCONNECT(PageServer *shard)
CLEANUP_AND_DISCONNECT(PageServer *shard)
{
if (shard->wes_read)
{
@@ -417,7 +417,7 @@ CLEANUP_AND_DISCONNECT(PageServer *shard)
* complete the connection (e.g. due to receiving an earlier cancellation
* during connection start).
* Returns true if successfully connected; false if the connection failed.
*
*
* Throws errors in unrecoverable situations, or when this backend's query
* is canceled.
*/

View File

@@ -52,7 +52,6 @@ PG_MODULE_MAGIC;
void _PG_init(void);
bool neon_enable_new_communicator;
static int running_xacts_overflow_policy;
static bool monitor_query_exec_time = false;
@@ -468,10 +467,10 @@ _PG_init(void)
#endif
DefineCustomBoolVariable(
"neon.enable_new_communicator",
"Enables new communicator implementation",
"neon.use_communicator_worker",
"Uses the communicator worker implementation",
NULL,
&neon_enable_new_communicator,
&neon_use_communicator_worker,
true,
PGC_POSTMASTER,
0,
@@ -483,7 +482,7 @@ _PG_init(void)
init_lwlsncache();
pg_init_communicator();
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
pg_init_communicator_new();
Custom_XLogReaderRoutines = NeonOnDemandXLogReaderRoutines;
@@ -648,7 +647,7 @@ approximate_working_set_size_seconds(PG_FUNCTION_ARGS)
duration = PG_ARGISNULL(0) ? (time_t) -1 : PG_GETARG_INT32(0);
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
dc = communicator_new_approximate_working_set_size_seconds(duration, false);
else
dc = lfc_approximate_working_set_size_seconds(duration, false);
@@ -664,7 +663,7 @@ approximate_working_set_size(PG_FUNCTION_ARGS)
bool reset = PG_GETARG_BOOL(0);
int32 dc;
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
dc = communicator_new_approximate_working_set_size_seconds(-1, reset);
else
dc = lfc_approximate_working_set_size_seconds(-1, reset);

View File

@@ -13,7 +13,6 @@
#include "utils/wait_event.h"
/* GUCs */
extern bool neon_enable_new_communicator;
extern char *neon_auth_token;
extern char *neon_timeline;
extern char *neon_tenant;

View File

@@ -822,7 +822,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
return false;
}
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
return communicator_new_rel_exists(InfoFromSMgrRel(reln), forkNum);
else
{
@@ -900,7 +900,7 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
* that's being replayed, so we should not have the correctness issue
* mentioned in previous paragraph.
*/
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
{
XLogRecPtr lsn = neon_get_write_lsn();
@@ -961,7 +961,7 @@ neon_unlink(NRelFileInfoBackend rinfo, ForkNumber forkNum, bool isRedo)
if (!NRelFileInfoBackendIsTemp(rinfo))
{
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
{
XLogRecPtr lsn = neon_get_write_lsn();
@@ -1055,7 +1055,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
forkNum, blkno,
(uint32) (lsn >> 32), (uint32) lsn);
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
{
// FIXME: this can pass lsn == invalid. Is that ok?
communicator_new_rel_extend(InfoFromSMgrRel(reln), forkNum, blkno, (const void *) buffer, lsn);
@@ -1182,7 +1182,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber start_block,
lsn = XLogInsert(RM_XLOG_ID, XLOG_FPI);
if (!neon_enable_new_communicator)
if (!neon_use_communicator_worker)
{
for (int i = 0; i < count; i++)
{
@@ -1198,7 +1198,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber start_block,
Assert(lsn != 0);
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
{
communicator_new_rel_zeroextend(InfoFromSMgrRel(reln), forkNum, start_block, nblocks, lsn);
}
@@ -1266,7 +1266,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
{
communicator_new_prefetch_register_bufferv(InfoFromSMgrRel(reln), forknum, blocknum, nblocks);
return false;
@@ -1276,7 +1276,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
tag.dbOid = reln->smgr_rlocator.locator.dbOid;
tag.relNumber = reln->smgr_rlocator.locator.relNumber;
tag.forkNum = forknum;
while (nblocks > 0)
{
int iterblocks = Min(nblocks, PG_IOV_MAX);
@@ -1298,7 +1298,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
blocknum += iterblocks;
}
if (!neon_enable_new_communicator)
if (!neon_use_communicator_worker)
communicator_prefetch_pump_state();
return false;
@@ -1326,7 +1326,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
{
communicator_new_prefetch_register_bufferv(InfoFromSMgrRel(reln), forknum, blocknum, 1);
}
@@ -1388,7 +1388,7 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
*/
neon_log(SmgrTrace, "writeback noop");
if (!neon_enable_new_communicator)
if (!neon_use_communicator_worker)
communicator_prefetch_pump_state();
if (debug_compare_local)
@@ -1406,7 +1406,7 @@ void
neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
neon_request_lsns request_lsns, void *buffer)
{
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
{
// FIXME: request_lsns is ignored. That affects the neon_test_utils callers.
// Add the capability to specify the LSNs explicitly, for the sake of neon_test_utils ?
@@ -1539,7 +1539,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
{
communicator_new_read_at_lsnv(InfoFromSMgrRel(reln), forkNum, blkno,
(void *) &buffer, 1);
@@ -1650,12 +1650,12 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
nblocks, PG_IOV_MAX);
/* Try to read PS results if they are available */
if (!neon_enable_new_communicator)
if (!neon_use_communicator_worker)
communicator_prefetch_pump_state();
memset(read_pages, 0, sizeof(read_pages));
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
{
communicator_new_read_at_lsnv(InfoFromSMgrRel(reln), forknum, blocknum,
buffers, nblocks);
@@ -1664,7 +1664,7 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
{
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
request_lsns, nblocks);
prefetch_result = communicator_prefetch_lookupv(InfoFromSMgrRel(reln), forknum,
blocknum, request_lsns, nblocks,
buffers, read_pages);
@@ -1811,7 +1811,7 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
forknum, blocknum,
(uint32) (lsn >> 32), (uint32) lsn);
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
{
communicator_new_write_page(InfoFromSMgrRel(reln), forknum, blocknum, buffer, lsn);
}
@@ -1881,7 +1881,7 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
neon_wallog_pagev(reln, forknum, blkno, nblocks, (const char **) buffers, false);
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
{
for (int i = 0; i < nblocks; i++)
{
@@ -1936,7 +1936,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
{
n_blocks = communicator_new_rel_nblocks(InfoFromSMgrRel(reln), forknum);
}
@@ -1976,7 +1976,7 @@ neon_dbsize(Oid dbNode)
neon_request_lsns request_lsns;
NRelFileInfo dummy_node = {0};
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
{
db_size = communicator_new_dbsize(dbNode);
}
@@ -2023,7 +2023,7 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, Blo
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
{
XLogRecPtr lsn = neon_get_write_lsn();
@@ -2104,7 +2104,7 @@ neon_immedsync(SMgrRelation reln, ForkNumber forknum)
neon_log(SmgrTrace, "[NEON_SMGR] immedsync noop");
if (!neon_enable_new_communicator)
if (!neon_use_communicator_worker)
communicator_prefetch_pump_state();
if (debug_compare_local)
@@ -2291,7 +2291,7 @@ neon_end_unlogged_build(SMgrRelation reln)
nblocks = mdnblocks(reln, MAIN_FORKNUM);
recptr = GetXLogInsertRecPtr();
if (!neon_enable_new_communicator)
if (!neon_use_communicator_worker)
{
neon_set_lwlsn_block_range(recptr,
InfoFromNInfoB(rinfob),
@@ -2308,7 +2308,7 @@ neon_end_unlogged_build(SMgrRelation reln)
RelFileInfoFmt(InfoFromNInfoB(rinfob)),
forknum);
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
{
communicator_new_update_cached_rel_size(InfoFromSMgrRel(reln), forknum, nblocks, recptr);
}
@@ -2384,8 +2384,8 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
request_lsns.not_modified_since = not_modified_since;
request_lsns.effective_request_lsn = request_lsn;
if (neon_enable_new_communicator)
n_blocks = communicator_new_read_slru_segment(kind, segno, buffer);
if (neon_use_communicator_worker)
n_blocks = communicator_new_read_slru_segment(kind, (uint32_t)segno, &request_lsns, path);
else
n_blocks = communicator_read_slru_segment(kind, segno, &request_lsns, buffer);
@@ -2424,7 +2424,7 @@ AtEOXact_neon(XactEvent event, void *arg)
}
break;
}
if (!neon_enable_new_communicator)
if (!neon_use_communicator_worker)
communicator_reconfigure_timeout_if_needed();
}
@@ -2483,7 +2483,7 @@ smgr_init_neon(void)
smgr_init_standard();
neon_init();
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
communicator_new_init();
else
communicator_init();
@@ -2498,7 +2498,7 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
/* This is only used in WAL replay */
Assert(RecoveryInProgress());
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
{
relsize = communicator_new_rel_nblocks(rinfo, forknum);
@@ -2677,7 +2677,7 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
* We should perform this check after assigning LwLSN to prevent
* prefetching of some older version of the page by some other backend.
*/
if (neon_enable_new_communicator)
if (neon_use_communicator_worker)
no_redo_needed = communicator_new_cache_contains(rinfo, forknum, blkno);
else
no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno);

View File

@@ -23,9 +23,7 @@
#include "utils/dynahash.h"
#include "utils/guc.h"
#if PG_VERSION_NUM >= 150000
#include "miscadmin.h"
#endif
typedef struct
{
@@ -100,7 +98,7 @@ get_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber *size)
{
bool found = false;
Assert(!neon_enable_new_communicator);
Assert(!neon_use_communicator_worker);
if (relsize_hash_size > 0)
{
@@ -133,7 +131,7 @@ get_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber *size)
void
set_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size)
{
Assert(!neon_enable_new_communicator);
Assert(!neon_use_communicator_worker);
if (relsize_hash_size > 0)
{
@@ -183,7 +181,7 @@ set_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size)
void
update_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size)
{
Assert(!neon_enable_new_communicator);
Assert(!neon_use_communicator_worker);
if (relsize_hash_size > 0)
{
@@ -219,7 +217,7 @@ update_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size)
void
forget_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum)
{
Assert(!neon_enable_new_communicator);
Assert(!neon_use_communicator_worker);
if (relsize_hash_size > 0)
{

View File

@@ -4369,9 +4369,9 @@ class Endpoint(PgProtocol, LogUtils):
# XXX: By checking for None, we enable the new communicator for all tests
# by default
if grpc or grpc is None:
config_lines += [f"neon.enable_new_communicator=on"]
config_lines += ["neon.use_communicator_worker=on"]
else:
config_lines += [f"neon.enable_new_communicator=off"]
config_lines += ["neon.use_communicator_worker=off"]
# Delete file cache if it exists (and we're recreating the endpoint)
if USE_LFC:

View File

@@ -17,7 +17,9 @@ def check_tenant(
config_lines = [
f"neon.safekeeper_proto_version = {safekeeper_proto_version}",
]
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id, config_lines=config_lines, grpc=True)
endpoint = env.endpoints.create_start(
"main", tenant_id=tenant_id, config_lines=config_lines, grpc=True
)
# we rely upon autocommit after each statement
res_1 = endpoint.safe_psql_many(
queries=[

View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.5",
"eac5279cd147d4086e0eb242198aae2f4b766d7b"
"160d0b52d66f4a5d21251a2912a50561bf600333"
],
"v16": [
"16.9",
"51194dc5ce2e3523068d8607852e6c3125a17e58"
"1486f919d4dc21637407ee7ed203497bb5bd516a"
],
"v15": [
"15.13",
"24313bf8f3de722968a2fdf764de7ef77ed64f06"
"9d19780350c0c7b536312dc3b891ade55628bc7b"
],
"v14": [
"14.18",
"ac3c460e01a31f11fb52fd8d8e88e60f0e1069b4"
"1cb207d1c9efb1f6c6f864a47bf45e992a7f0eb0"
]
}