diff --git a/pgxn/neon/Makefile b/pgxn/neon/Makefile index 322ab039f5..3ea7a946cf 100644 --- a/pgxn/neon/Makefile +++ b/pgxn/neon/Makefile @@ -9,6 +9,7 @@ OBJS = \ extension_server.o \ file_cache.o \ hll.o \ + lfc_prewarm.o \ libpagestore.o \ logical_replication_monitor.o \ neon.o \ diff --git a/pgxn/neon/communicator/src/backend_interface.rs b/pgxn/neon/communicator/src/backend_interface.rs index 45715abee5..abc982193e 100644 --- a/pgxn/neon/communicator/src/backend_interface.rs +++ b/pgxn/neon/communicator/src/backend_interface.rs @@ -201,6 +201,54 @@ pub extern "C" fn bcomm_cache_contains( ) } +#[repr(C)] +#[derive(Clone, Debug)] +pub struct FileCacheIterator { + next_bucket: u64, + + pub spc_oid: COid, + pub db_oid: COid, + pub rel_number: u32, + pub fork_number: u8, + pub block_number: u32, +} + +/// Iterate over LFC contents +#[unsafe(no_mangle)] +pub extern "C" fn bcomm_cache_iterate_begin(_bs: &mut CommunicatorBackendStruct, iter: *mut FileCacheIterator) { + unsafe { (*iter).next_bucket = 0 }; +} +#[unsafe(no_mangle)] +pub extern "C" fn bcomm_cache_iterate_next(bs: &mut CommunicatorBackendStruct, iter: *mut FileCacheIterator) -> bool { + use crate::integrated_cache::GetBucketResult; + loop { + let next_bucket = unsafe { (*iter).next_bucket } as usize; + match bs.integrated_cache.get_bucket(next_bucket) { + GetBucketResult::Occupied(rel, blk) => { + unsafe { + (*iter).spc_oid = rel.spcnode; + (*iter).db_oid = rel.dbnode; + (*iter).rel_number = rel.relnode; + (*iter).fork_number = rel.forknum; + (*iter).block_number = blk; + + (*iter).next_bucket += 1; + } + break true; + }, + GetBucketResult::Vacant => { + unsafe { + (*iter).next_bucket += 1; + } + continue; + } + GetBucketResult::OutOfBounds => { + break false; + } + } + } +} + impl<'t> CommunicatorBackendStruct<'t> { /// The slot must be free, or this panics. pub(crate) fn start_neon_io_request(&mut self, request_slot_idx: i32, request: &NeonIORequest) { diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index a69af44492..e43e76b1b5 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -717,6 +717,12 @@ fn get_rel_size( } } +pub enum GetBucketResult { + Occupied(RelTag, u32), + Vacant, + OutOfBounds, +} + /// Accessor for other backends /// /// This allows backends to read pages from the cache directly, on their own, without making a @@ -739,6 +745,21 @@ impl<'t> IntegratedCacheReadAccess<'t> { .get(&BlockKey::from((rel, block_number))) .is_some() } + + pub fn get_bucket(&self, bucket_no: usize) -> GetBucketResult { + match self.block_map.get_at_bucket(bucket_no).as_deref() { + None => { + // free bucket, or out of bounds + if bucket_no >= self.block_map.get_num_buckets() { + GetBucketResult::OutOfBounds + } else { + GetBucketResult::Vacant + } + } + Some((key, _)) => GetBucketResult::Occupied(key.rel, key.block_number), + } + } + } pub struct BackendCacheReadOp<'t> { diff --git a/pgxn/neon/communicator_new.c b/pgxn/neon/communicator_new.c index 68501f4ca2..cb0bbc5ee0 100644 --- a/pgxn/neon/communicator_new.c +++ b/pgxn/neon/communicator_new.c @@ -40,6 +40,7 @@ #include "storage/spin.h" #include "tcop/tcopprot.h" +#include "bitmap.h" #include "communicator_new.h" #include "hll.h" #include "neon.h" @@ -670,6 +671,45 @@ communicator_new_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, blockno); } +/* Dump a list of blocks in the LFC, for use in prewarming later */ +FileCacheState * +communicator_new_get_lfc_state(size_t max_entries) +{ + struct FileCacheIterator iter; + FileCacheState* fcs; + uint8 *bitmap; + /* TODO: Max(max_entries, ) */ + size_t n_entries = max_entries; + size_t state_size = FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_entries, 1); + size_t n_pages = 0; + + fcs = (FileCacheState *) palloc0(state_size); + SET_VARSIZE(fcs, state_size); + fcs->magic = FILE_CACHE_STATE_MAGIC; + fcs->chunk_size_log = 0; + fcs->n_chunks = n_entries; + bitmap = FILE_CACHE_STATE_BITMAP(fcs); + + bcomm_cache_iterate_begin(my_bs, &iter); + while (n_pages < max_entries && bcomm_cache_iterate_next(my_bs, &iter)) + { + BufferTag tag; + + BufTagInit(tag, iter.rel_number, iter.fork_number, iter.block_number, iter.spc_oid, iter.db_oid); + fcs->chunks[n_pages] = tag; + n_pages++; + } + + /* fill bitmap. TODO: memset would be more efficient, but this is a silly format anyway */ + for (size_t i = 0; i < n_pages; i++) + { + BITMAP_SET(bitmap, i); + } + fcs->n_pages = n_pages; + + return fcs; +} + /* * Drain all in-flight requests from the queue. * diff --git a/pgxn/neon/communicator_new.h b/pgxn/neon/communicator_new.h index 7fbc167f0f..8de2fab57a 100644 --- a/pgxn/neon/communicator_new.h +++ b/pgxn/neon/communicator_new.h @@ -12,6 +12,7 @@ #ifndef COMMUNICATOR_NEW_H #define COMMUNICATOR_NEW_H +#include "lfc_prewarm.h" #include "neon_pgversioncompat.h" #include "storage/buf_internals.h" @@ -61,4 +62,6 @@ extern void communicator_new_update_cached_rel_size(NRelFileInfo rinfo, ForkNumb /* other functions */ extern int32 communicator_new_approximate_working_set_size_seconds(time_t duration, bool reset); +extern FileCacheState *communicator_new_get_lfc_state(size_t max_entries); + #endif /* COMMUNICATOR_NEW_H */ diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 91d8dac274..7c408c82da 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -134,15 +134,6 @@ typedef struct FileCacheEntry #define N_COND_VARS 64 #define CV_WAIT_TIMEOUT 10 -#define MAX_PREWARM_WORKERS 8 - -typedef struct PrewarmWorkerState -{ - uint32 prewarmed_pages; - uint32 skipped_pages; - TimestampTz completed; -} PrewarmWorkerState; - typedef struct FileCacheControl { uint64 generation; /* generation is needed to handle correct hash @@ -188,47 +179,27 @@ typedef struct FileCacheControl * again. */ HyperLogLogState wss_estimation; - - /* Prewarmer state */ - PrewarmWorkerState prewarm_workers[MAX_PREWARM_WORKERS]; - size_t n_prewarm_workers; - size_t n_prewarm_entries; - size_t total_prewarm_pages; - size_t prewarm_batch; - bool prewarm_active; - bool prewarm_canceled; - dsm_handle prewarm_lfc_state_handle; } FileCacheControl; -#define FILE_CACHE_STATE_MAGIC 0xfcfcfcfc - -#define FILE_CACHE_STATE_BITMAP(fcs) ((uint8*)&(fcs)->chunks[(fcs)->n_chunks]) -#define FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_chunks) (sizeof(FileCacheState) + (n_chunks)*sizeof(BufferTag) + (((n_chunks) * lfc_blocks_per_chunk)+7)/8) -#define FILE_CACHE_STATE_SIZE(fcs) (sizeof(FileCacheState) + (fcs->n_chunks)*sizeof(BufferTag) + (((fcs->n_chunks) << fcs->chunk_size_log)+7)/8) - static HTAB *lfc_hash; static int lfc_desc = -1; static LWLockId lfc_lock; int lfc_max_size; int lfc_size_limit; -static int lfc_prewarm_limit; -static int lfc_prewarm_batch; static int lfc_chunk_size_log = MAX_BLOCKS_PER_CHUNK_LOG; static int lfc_blocks_per_chunk = MAX_BLOCKS_PER_CHUNK; char *lfc_path; static uint64 lfc_generation; static FileCacheControl *lfc_ctl; -static bool lfc_do_prewarm; bool lfc_store_prefetch_result; bool lfc_prewarm_update_ws_estimation; -bool AmPrewarmWorker; +bool lfc_do_prewarm; +bool lfc_prewarm_cancel; #define LFC_ENABLED() (lfc_ctl->limit != 0) -PGDLLEXPORT void lfc_prewarm_main(Datum main_arg); - /* * Close LFC file if opened. * All backends should close their LFC files once LFC is disabled. @@ -611,34 +582,13 @@ lfc_init(void) lfc_check_chunk_size, lfc_change_chunk_size, NULL); - - DefineCustomIntVariable("neon.file_cache_prewarm_limit", - "Maximal number of prewarmed chunks", - NULL, - &lfc_prewarm_limit, - INT_MAX, /* no limit by default */ - 0, - INT_MAX, - PGC_SIGHUP, - 0, - NULL, - NULL, - NULL); - - DefineCustomIntVariable("neon.file_cache_prewarm_batch", - "Number of pages retrivied by prewarm from page server", - NULL, - &lfc_prewarm_batch, - 64, - 1, - INT_MAX, - PGC_SIGHUP, - 0, - NULL, - NULL, - NULL); } +/* + * Dump a list of pages that are currently in the LFC + * + * This is used to get a snapshot that can be used to prewarm the LFC later. + */ FileCacheState* lfc_get_state(size_t max_entries) { @@ -656,7 +606,7 @@ lfc_get_state(size_t max_entries) uint8* bitmap; size_t n_pages = 0; size_t n_entries = Min(max_entries, lfc_ctl->used - lfc_ctl->pinned); - size_t state_size = FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_entries); + size_t state_size = FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_entries, lfc_blocks_per_chunk); fcs = (FileCacheState*)palloc0(state_size); SET_VARSIZE(fcs, state_size); fcs->magic = FILE_CACHE_STATE_MAGIC; @@ -690,270 +640,6 @@ lfc_get_state(size_t max_entries) return fcs; } -/* - * Prewarm LFC cache to the specified state. It uses lfc_prefetch function to load prewarmed page without hoilding shared buffer lock - * and avoid race conditions with other backends. - */ -void -lfc_prewarm(FileCacheState* fcs, uint32 n_workers) -{ - size_t fcs_chunk_size_log; - size_t n_entries; - size_t prewarm_batch = Min(lfc_prewarm_batch, readahead_buffer_size); - size_t fcs_size; - dsm_segment *seg; - BackgroundWorkerHandle* bgw_handle[MAX_PREWARM_WORKERS]; - - Assert(!neon_use_communicator_worker); - - if (!lfc_ensure_opened()) - return; - - if (prewarm_batch == 0 || lfc_prewarm_limit == 0 || n_workers == 0) - { - elog(LOG, "LFC: prewarm is disabled"); - return; - } - - if (n_workers > MAX_PREWARM_WORKERS) - { - elog(ERROR, "LFC: Too much prewarm workers, maximum is %d", MAX_PREWARM_WORKERS); - } - - if (fcs == NULL || fcs->n_chunks == 0) - { - elog(LOG, "LFC: nothing to prewarm"); - return; - } - - if (fcs->magic != FILE_CACHE_STATE_MAGIC) - { - elog(ERROR, "LFC: Invalid file cache state magic: %X", fcs->magic); - } - - fcs_size = VARSIZE(fcs); - if (FILE_CACHE_STATE_SIZE(fcs) != fcs_size) - { - elog(ERROR, "LFC: Invalid file cache state size: %u vs. %u", (unsigned)FILE_CACHE_STATE_SIZE(fcs), VARSIZE(fcs)); - } - - fcs_chunk_size_log = fcs->chunk_size_log; - if (fcs_chunk_size_log > MAX_BLOCKS_PER_CHUNK_LOG) - { - elog(ERROR, "LFC: Invalid chunk size log: %u", fcs->chunk_size_log); - } - - n_entries = Min(fcs->n_chunks, lfc_prewarm_limit); - Assert(n_entries != 0); - - LWLockAcquire(lfc_lock, LW_EXCLUSIVE); - - /* Do not prewarm more entries than LFC limit */ - if (lfc_ctl->limit <= lfc_ctl->size) - { - elog(LOG, "LFC: skip prewarm because LFC is already filled"); - LWLockRelease(lfc_lock); - return; - } - - if (lfc_ctl->prewarm_active) - { - LWLockRelease(lfc_lock); - elog(ERROR, "LFC: skip prewarm because another prewarm is still active"); - } - lfc_ctl->n_prewarm_entries = n_entries; - lfc_ctl->n_prewarm_workers = n_workers; - lfc_ctl->prewarm_active = true; - lfc_ctl->prewarm_canceled = false; - lfc_ctl->prewarm_batch = prewarm_batch; - memset(lfc_ctl->prewarm_workers, 0, n_workers*sizeof(PrewarmWorkerState)); - - LWLockRelease(lfc_lock); - - /* Calculate total number of pages to be prewarmed */ - lfc_ctl->total_prewarm_pages = fcs->n_pages; - - seg = dsm_create(fcs_size, 0); - memcpy(dsm_segment_address(seg), fcs, fcs_size); - lfc_ctl->prewarm_lfc_state_handle = dsm_segment_handle(seg); - - /* Spawn background workers */ - for (uint32 i = 0; i < n_workers; i++) - { - BackgroundWorker worker = {0}; - - worker.bgw_flags = BGWORKER_SHMEM_ACCESS; - worker.bgw_start_time = BgWorkerStart_ConsistentState; - worker.bgw_restart_time = BGW_NEVER_RESTART; - strcpy(worker.bgw_library_name, "neon"); - strcpy(worker.bgw_function_name, "lfc_prewarm_main"); - snprintf(worker.bgw_name, BGW_MAXLEN, "LFC prewarm worker %d", i+1); - strcpy(worker.bgw_type, "LFC prewarm worker"); - worker.bgw_main_arg = Int32GetDatum(i); - /* must set notify PID to wait for shutdown */ - worker.bgw_notify_pid = MyProcPid; - - if (!RegisterDynamicBackgroundWorker(&worker, &bgw_handle[i])) - { - ereport(LOG, - (errcode(ERRCODE_INSUFFICIENT_RESOURCES), - errmsg("LFC: registering dynamic bgworker prewarm failed"), - errhint("Consider increasing the configuration parameter \"%s\".", "max_worker_processes"))); - n_workers = i; - lfc_ctl->prewarm_canceled = true; - break; - } - } - - for (uint32 i = 0; i < n_workers; i++) - { - bool interrupted; - do - { - interrupted = false; - PG_TRY(); - { - BgwHandleStatus status = WaitForBackgroundWorkerShutdown(bgw_handle[i]); - if (status != BGWH_STOPPED && status != BGWH_POSTMASTER_DIED) - { - elog(LOG, "LFC: Unexpected status of prewarm worker termination: %d", status); - } - } - PG_CATCH(); - { - elog(LOG, "LFC: cancel prewarm"); - lfc_ctl->prewarm_canceled = true; - interrupted = true; - } - PG_END_TRY(); - } while (interrupted); - - if (!lfc_ctl->prewarm_workers[i].completed) - { - /* Background worker doesn't set completion time: it means that it was abnormally terminated */ - elog(LOG, "LFC: prewarm worker %d failed", i+1); - /* Set completion time to prevent get_prewarm_info from considering this worker as active */ - lfc_ctl->prewarm_workers[i].completed = GetCurrentTimestamp(); - } - } - dsm_detach(seg); - - LWLockAcquire(lfc_lock, LW_EXCLUSIVE); - lfc_ctl->prewarm_active = false; - LWLockRelease(lfc_lock); -} - -void -lfc_prewarm_main(Datum main_arg) -{ - size_t snd_idx = 0, rcv_idx = 0; - size_t n_sent = 0, n_received = 0; - size_t fcs_chunk_size_log; - size_t max_prefetch_pages; - size_t prewarm_batch; - size_t n_workers; - dsm_segment *seg; - FileCacheState* fcs; - uint8* bitmap; - BufferTag tag; - PrewarmWorkerState* ws; - uint32 worker_id = DatumGetInt32(main_arg); - - Assert(!neon_use_communicator_worker); - - AmPrewarmWorker = true; - - pqsignal(SIGTERM, die); - BackgroundWorkerUnblockSignals(); - - seg = dsm_attach(lfc_ctl->prewarm_lfc_state_handle); - if (seg == NULL) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not map dynamic shared memory segment"))); - - fcs = (FileCacheState*) dsm_segment_address(seg); - prewarm_batch = lfc_ctl->prewarm_batch; - fcs_chunk_size_log = fcs->chunk_size_log; - n_workers = lfc_ctl->n_prewarm_workers; - max_prefetch_pages = lfc_ctl->n_prewarm_entries << fcs_chunk_size_log; - ws = &lfc_ctl->prewarm_workers[worker_id]; - bitmap = FILE_CACHE_STATE_BITMAP(fcs); - - /* enable prefetch in LFC */ - lfc_store_prefetch_result = true; - lfc_do_prewarm = true; /* Flag for lfc_prefetch preventing replacement of existed entries if LFC cache is full */ - - elog(LOG, "LFC: worker %d start prewarming", worker_id); - while (!lfc_ctl->prewarm_canceled) - { - if (snd_idx < max_prefetch_pages) - { - if ((snd_idx >> fcs_chunk_size_log) % n_workers != worker_id) - { - /* If there are multiple workers, split chunks between them */ - snd_idx += 1 << fcs_chunk_size_log; - } - else - { - if (BITMAP_ISSET(bitmap, snd_idx)) - { - tag = fcs->chunks[snd_idx >> fcs_chunk_size_log]; - tag.blockNum += snd_idx & ((1 << fcs_chunk_size_log) - 1); - if (!lfc_cache_contains(BufTagGetNRelFileInfo(tag), tag.forkNum, tag.blockNum)) - { - (void)communicator_prefetch_register_bufferv(tag, NULL, 1, NULL); - n_sent += 1; - } - else - { - ws->skipped_pages += 1; - BITMAP_CLR(bitmap, snd_idx); - } - } - snd_idx += 1; - } - } - if (n_sent >= n_received + prewarm_batch || snd_idx == max_prefetch_pages) - { - if (n_received == n_sent && snd_idx == max_prefetch_pages) - { - break; - } - if ((rcv_idx >> fcs_chunk_size_log) % n_workers != worker_id) - { - /* Skip chunks processed by other workers */ - rcv_idx += 1 << fcs_chunk_size_log; - continue; - } - - /* Locate next block to prefetch */ - while (!BITMAP_ISSET(bitmap, rcv_idx)) - { - rcv_idx += 1; - } - tag = fcs->chunks[rcv_idx >> fcs_chunk_size_log]; - tag.blockNum += rcv_idx & ((1 << fcs_chunk_size_log) - 1); - if (communicator_prefetch_receive(tag)) - { - ws->prewarmed_pages += 1; - } - else - { - ws->skipped_pages += 1; - } - rcv_idx += 1; - n_received += 1; - } - } - /* No need to perform prefetch cleanup here because prewarm worker will be terminated and - * connection to PS dropped just after return from this function. - */ - Assert(n_sent == n_received || lfc_ctl->prewarm_canceled); - elog(LOG, "LFC: worker %d complete prewarming: loaded %ld pages", worker_id, (long)n_received); - lfc_ctl->prewarm_workers[worker_id].completed = GetCurrentTimestamp(); -} - void lfc_invalidate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks) { @@ -1466,7 +1152,7 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash) /* Can't add this chunk - we don't have the space for it */ hash_search_with_hash_value(lfc_hash, &entry->key, hash, HASH_REMOVE, NULL); - lfc_ctl->prewarm_canceled = true; /* cancel prewarm if LFC limit is reached */ + lfc_prewarm_cancel = true; /* cancel prewarm if LFC limit is reached */ return false; } @@ -2176,95 +1862,3 @@ lfc_approximate_working_set_size_seconds(time_t duration, bool reset) memset(lfc_ctl->wss_estimation.regs, 0, sizeof lfc_ctl->wss_estimation.regs); return dc; } - -PG_FUNCTION_INFO_V1(get_local_cache_state); - -Datum -get_local_cache_state(PG_FUNCTION_ARGS) -{ - size_t max_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0); - FileCacheState* fcs; - - if (neon_use_communicator_worker) - elog(ERROR, "TODO: not implemented"); - - fcs = lfc_get_state(max_entries); - - if (fcs != NULL) - PG_RETURN_BYTEA_P((bytea*)fcs); - else - PG_RETURN_NULL(); -} - -PG_FUNCTION_INFO_V1(prewarm_local_cache); - -Datum -prewarm_local_cache(PG_FUNCTION_ARGS) -{ - bytea* state = PG_GETARG_BYTEA_PP(0); - uint32 n_workers = PG_GETARG_INT32(1); - FileCacheState* fcs; - - if (neon_use_communicator_worker) - elog(ERROR, "TODO: not implemented"); - - fcs = (FileCacheState*)state; - lfc_prewarm(fcs, n_workers); - - PG_RETURN_NULL(); -} - -PG_FUNCTION_INFO_V1(get_prewarm_info); - -Datum -get_prewarm_info(PG_FUNCTION_ARGS) -{ - Datum values[4]; - bool nulls[4]; - TupleDesc tupdesc; - uint32 prewarmed_pages = 0; - uint32 skipped_pages = 0; - uint32 active_workers = 0; - uint32 total_pages; - size_t n_workers; - - if (neon_use_communicator_worker) - elog(ERROR, "TODO: not implemented"); - - if (lfc_size_limit == 0) - PG_RETURN_NULL(); - - LWLockAcquire(lfc_lock, LW_SHARED); - if (!lfc_ctl || lfc_ctl->n_prewarm_workers == 0) - { - LWLockRelease(lfc_lock); - PG_RETURN_NULL(); - } - n_workers = lfc_ctl->n_prewarm_workers; - total_pages = lfc_ctl->total_prewarm_pages; - for (size_t i = 0; i < n_workers; i++) - { - PrewarmWorkerState* ws = &lfc_ctl->prewarm_workers[i]; - prewarmed_pages += ws->prewarmed_pages; - skipped_pages += ws->skipped_pages; - active_workers += ws->completed != 0; - } - LWLockRelease(lfc_lock); - - tupdesc = CreateTemplateTupleDesc(4); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "total_pages", INT4OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 2, "prewarmed_pages", INT4OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 3, "skipped_pages", INT4OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 4, "active_workers", INT4OID, -1, 0); - tupdesc = BlessTupleDesc(tupdesc); - - MemSet(nulls, 0, sizeof(nulls)); - - values[0] = Int32GetDatum(total_pages); - values[1] = Int32GetDatum(prewarmed_pages); - values[2] = Int32GetDatum(skipped_pages); - values[3] = Int32GetDatum(active_workers); - - PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); -} - diff --git a/pgxn/neon/file_cache.h b/pgxn/neon/file_cache.h index f8056e22ff..fd79eee532 100644 --- a/pgxn/neon/file_cache.h +++ b/pgxn/neon/file_cache.h @@ -11,18 +11,9 @@ #ifndef FILE_CACHE_h #define FILE_CACHE_h -#include "neon_pgversioncompat.h" +#include "lfc_prewarm.h" -typedef struct FileCacheState -{ - int32 vl_len_; /* varlena header (do not touch directly!) */ - uint32 magic; - uint32 n_chunks; - uint32 n_pages; - uint16 chunk_size_log; - BufferTag chunks[FLEXIBLE_ARRAY_MEMBER]; - /* followed by bitmap */ -} FileCacheState; +#include "neon_pgversioncompat.h" /* GUCs */ extern bool lfc_store_prefetch_result; @@ -30,6 +21,9 @@ extern int lfc_max_size; extern int lfc_size_limit; extern char *lfc_path; +extern bool lfc_do_prewarm; +extern bool lfc_prewarm_cancel; + /* functions for local file cache */ extern void lfc_invalidate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks); extern void lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, @@ -48,7 +42,6 @@ extern void lfc_init(void); extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, const void* buffer, XLogRecPtr lsn); extern FileCacheState* lfc_get_state(size_t max_entries); -extern void lfc_prewarm(FileCacheState* fcs, uint32 n_workers); extern int32 lfc_approximate_working_set_size_seconds(time_t duration, bool reset); diff --git a/pgxn/neon/lfc_prewarm.c b/pgxn/neon/lfc_prewarm.c new file mode 100644 index 0000000000..2acb805f9d --- /dev/null +++ b/pgxn/neon/lfc_prewarm.c @@ -0,0 +1,654 @@ +/*------------------------------------------------------------------------- + * + * lfc_prewarm.c + * Functions related to LFC prewarming + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "bitmap.h" +#include "communicator.h" +#include "communicator_new.h" +#include "file_cache.h" +#include "lfc_prewarm.h" +#include "neon.h" +#include "pagestore_client.h" + +#include "funcapi.h" +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "storage/dsm.h" +#include "tcop/tcopprot.h" +#include "utils/timestamp.h" + +#define MAX_PREWARM_WORKERS 8 + +typedef struct PrewarmWorkerState +{ + uint32 prewarmed_pages; + uint32 skipped_pages; + TimestampTz completed; +} PrewarmWorkerState; + +typedef struct PrewarmControl +{ + /* -1 when not using workers, 0 when no prewarm has been performed */ + size_t n_prewarm_workers; + size_t total_prewarm_pages; + bool prewarm_active; + bool prewarm_canceled; + + /* These are used in the non-worker mode */ + uint32 prewarmed_pages; + uint32 skipped_pages; + TimestampTz completed; + + /* These are used with workers */ + PrewarmWorkerState prewarm_workers[MAX_PREWARM_WORKERS]; + dsm_handle prewarm_lfc_state_handle; + size_t prewarm_batch; + size_t n_prewarm_entries; +} PrewarmControl; + +static PrewarmControl *prewarm_ctl; + +static int lfc_prewarm_limit; +static int lfc_prewarm_batch; + +static LWLockId prewarm_lock; + +bool AmPrewarmWorker; + +static void lfc_prewarm_with_workers(FileCacheState *fcs, uint32 n_workers); +static void lfc_prewarm_with_async_requests(FileCacheState *fcs); +PGDLLEXPORT void lfc_prewarm_main(Datum main_arg); + +void +pg_init_prewarm(void) +{ + DefineCustomIntVariable("neon.file_cache_prewarm_limit", + "Maximal number of prewarmed chunks", + NULL, + &lfc_prewarm_limit, + INT_MAX, /* no limit by default */ + 0, + INT_MAX, + PGC_SIGHUP, + 0, + NULL, + NULL, + NULL); + + DefineCustomIntVariable("neon.file_cache_prewarm_batch", + "Number of pages retrivied by prewarm from page server", + NULL, + &lfc_prewarm_batch, + 64, + 1, + INT_MAX, + PGC_SIGHUP, + 0, + NULL, + NULL, + NULL); +} + +static size_t +PrewarmShmemSize(void) +{ + return sizeof(PrewarmControl); +} + +void +PrewarmShmemRequest(void) +{ + RequestAddinShmemSpace(PrewarmShmemSize()); + RequestNamedLWLockTranche("prewarm_lock", 1); +} + +void +PrewarmShmemInit(void) +{ + bool found; + + prewarm_ctl = (PrewarmControl *) ShmemInitStruct("Prewarmer shmem state", + PrewarmShmemSize(), + &found); + if (!found) + { + /* it's zeroed already */ + + prewarm_lock = (LWLockId) GetNamedLWLockTranche("prewarm_lock"); + } +} + +static void +validate_fcs(FileCacheState *fcs) +{ + size_t fcs_size; +#if 0 + size_t fcs_chunk_size_log; +#endif + + if (fcs->magic != FILE_CACHE_STATE_MAGIC) + { + elog(ERROR, "LFC: Invalid file cache state magic: %X", fcs->magic); + } + + fcs_size = VARSIZE(fcs); + if (FILE_CACHE_STATE_SIZE(fcs) != fcs_size) + { + elog(ERROR, "LFC: Invalid file cache state size: %u vs. %u", (unsigned)FILE_CACHE_STATE_SIZE(fcs), VARSIZE(fcs)); + } + + /* FIXME */ +#if 0 + fcs_chunk_size_log = fcs->chunk_size_log; + if (fcs_chunk_size_log > MAX_BLOCKS_PER_CHUNK_LOG) + { + elog(ERROR, "LFC: Invalid chunk size log: %u", fcs->chunk_size_log); + } +#endif +} + +/* + * Prewarm LFC cache to the specified state. It uses lfc_prefetch function to + * load prewarmed page without hoilding shared buffer lock and avoid race + * conditions with other backends. + */ +void +lfc_prewarm_with_workers(FileCacheState *fcs, uint32 n_workers) +{ + size_t n_entries; + size_t prewarm_batch = Min(lfc_prewarm_batch, readahead_buffer_size); + size_t fcs_size = VARSIZE(fcs); + dsm_segment *seg; + BackgroundWorkerHandle* bgw_handle[MAX_PREWARM_WORKERS]; + + Assert(!neon_use_communicator_worker); + + if (prewarm_batch == 0 || lfc_prewarm_limit == 0 || n_workers == 0) + { + elog(LOG, "LFC: prewarm is disabled"); + return; + } + + if (n_workers > MAX_PREWARM_WORKERS) + { + elog(ERROR, "LFC: too many prewarm workers, maximum is %d", MAX_PREWARM_WORKERS); + } + + if (fcs == NULL || fcs->n_chunks == 0) + { + elog(LOG, "LFC: nothing to prewarm"); + return; + } + + n_entries = Min(fcs->n_chunks, lfc_prewarm_limit); + Assert(n_entries != 0); + + LWLockAcquire(prewarm_lock, LW_EXCLUSIVE); + + /* Do not prewarm more entries than LFC limit */ + /* FIXME */ +#if 0 + if (prewarm_ctl->limit <= prewarm_ctl->size) + { + elog(LOG, "LFC: skip prewarm because LFC is already filled"); + LWLockRelease(prewarm_lock); + return; + } +#endif + + if (prewarm_ctl->prewarm_active) + { + LWLockRelease(prewarm_lock); + elog(ERROR, "LFC: skip prewarm because another prewarm is still active"); + } + prewarm_ctl->n_prewarm_entries = n_entries; + prewarm_ctl->n_prewarm_workers = n_workers; + prewarm_ctl->prewarm_active = true; + prewarm_ctl->prewarm_canceled = false; + prewarm_ctl->prewarm_batch = prewarm_batch; + memset(prewarm_ctl->prewarm_workers, 0, n_workers*sizeof(PrewarmWorkerState)); + + /* Calculate total number of pages to be prewarmed */ + prewarm_ctl->total_prewarm_pages = fcs->n_pages; + + LWLockRelease(prewarm_lock); + + seg = dsm_create(fcs_size, 0); + memcpy(dsm_segment_address(seg), fcs, fcs_size); + prewarm_ctl->prewarm_lfc_state_handle = dsm_segment_handle(seg); + + /* Spawn background workers */ + for (uint32 i = 0; i < n_workers; i++) + { + BackgroundWorker worker = {0}; + + worker.bgw_flags = BGWORKER_SHMEM_ACCESS; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + worker.bgw_restart_time = BGW_NEVER_RESTART; + strcpy(worker.bgw_library_name, "neon"); + strcpy(worker.bgw_function_name, "lfc_prewarm_main"); + snprintf(worker.bgw_name, BGW_MAXLEN, "LFC prewarm worker %d", i+1); + strcpy(worker.bgw_type, "LFC prewarm worker"); + worker.bgw_main_arg = Int32GetDatum(i); + /* must set notify PID to wait for shutdown */ + worker.bgw_notify_pid = MyProcPid; + + if (!RegisterDynamicBackgroundWorker(&worker, &bgw_handle[i])) + { + ereport(LOG, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("LFC: registering dynamic bgworker prewarm failed"), + errhint("Consider increasing the configuration parameter \"%s\".", "max_worker_processes"))); + n_workers = i; + prewarm_ctl->prewarm_canceled = true; + break; + } + } + + for (uint32 i = 0; i < n_workers; i++) + { + bool interrupted; + do + { + interrupted = false; + PG_TRY(); + { + BgwHandleStatus status = WaitForBackgroundWorkerShutdown(bgw_handle[i]); + if (status != BGWH_STOPPED && status != BGWH_POSTMASTER_DIED) + { + elog(LOG, "LFC: Unexpected status of prewarm worker termination: %d", status); + } + } + PG_CATCH(); + { + elog(LOG, "LFC: cancel prewarm"); + prewarm_ctl->prewarm_canceled = true; + interrupted = true; + } + PG_END_TRY(); + } while (interrupted); + + if (!prewarm_ctl->prewarm_workers[i].completed) + { + /* Background worker doesn't set completion time: it means that it was abnormally terminated */ + elog(LOG, "LFC: prewarm worker %d failed", i+1); + /* Set completion time to prevent get_prewarm_info from considering this worker as active */ + prewarm_ctl->prewarm_workers[i].completed = GetCurrentTimestamp(); + } + } + dsm_detach(seg); + + LWLockAcquire(prewarm_lock, LW_EXCLUSIVE); + prewarm_ctl->prewarm_active = false; + LWLockRelease(prewarm_lock); +} + + +void +lfc_prewarm_main(Datum main_arg) +{ + size_t snd_idx = 0, rcv_idx = 0; + size_t n_sent = 0, n_received = 0; + size_t fcs_chunk_size_log; + size_t max_prefetch_pages; + size_t prewarm_batch; + size_t n_workers; + dsm_segment *seg; + FileCacheState* fcs; + uint8* bitmap; + BufferTag tag; + PrewarmWorkerState* ws; + uint32 worker_id = DatumGetInt32(main_arg); + + Assert(!neon_use_communicator_worker); + + AmPrewarmWorker = true; + + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + seg = dsm_attach(prewarm_ctl->prewarm_lfc_state_handle); + if (seg == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not map dynamic shared memory segment"))); + + fcs = (FileCacheState*) dsm_segment_address(seg); + prewarm_batch = prewarm_ctl->prewarm_batch; + fcs_chunk_size_log = fcs->chunk_size_log; + n_workers = prewarm_ctl->n_prewarm_workers; + max_prefetch_pages = prewarm_ctl->n_prewarm_entries << fcs_chunk_size_log; + ws = &prewarm_ctl->prewarm_workers[worker_id]; + bitmap = FILE_CACHE_STATE_BITMAP(fcs); + + /* enable prefetch in LFC */ + lfc_store_prefetch_result = true; + lfc_do_prewarm = true; /* Flag for lfc_prefetch preventing replacement of existed entries if LFC cache is full */ + + elog(LOG, "LFC: worker %d start prewarming", worker_id); + while (!prewarm_ctl->prewarm_canceled) + { + if (snd_idx < max_prefetch_pages) + { + if ((snd_idx >> fcs_chunk_size_log) % n_workers != worker_id) + { + /* If there are multiple workers, split chunks between them */ + snd_idx += 1 << fcs_chunk_size_log; + } + else + { + if (BITMAP_ISSET(bitmap, snd_idx)) + { + tag = fcs->chunks[snd_idx >> fcs_chunk_size_log]; + tag.blockNum += snd_idx & ((1 << fcs_chunk_size_log) - 1); + if (!lfc_cache_contains(BufTagGetNRelFileInfo(tag), tag.forkNum, tag.blockNum)) + { + (void) communicator_prefetch_register_bufferv(tag, NULL, 1, NULL); + n_sent += 1; + } + else + { + ws->skipped_pages += 1; + BITMAP_CLR(bitmap, snd_idx); + } + } + snd_idx += 1; + } + } + if (n_sent >= n_received + prewarm_batch || snd_idx == max_prefetch_pages) + { + if (n_received == n_sent && snd_idx == max_prefetch_pages) + { + break; + } + if ((rcv_idx >> fcs_chunk_size_log) % n_workers != worker_id) + { + /* Skip chunks processed by other workers */ + rcv_idx += 1 << fcs_chunk_size_log; + continue; + } + + /* Locate next block to prefetch */ + while (!BITMAP_ISSET(bitmap, rcv_idx)) + { + rcv_idx += 1; + } + tag = fcs->chunks[rcv_idx >> fcs_chunk_size_log]; + tag.blockNum += rcv_idx & ((1 << fcs_chunk_size_log) - 1); + if (communicator_prefetch_receive(tag)) + { + ws->prewarmed_pages += 1; + } + else + { + ws->skipped_pages += 1; + } + rcv_idx += 1; + n_received += 1; + } + } + /* No need to perform prefetch cleanup here because prewarm worker will be terminated and + * connection to PS dropped just after return from this function. + */ + Assert(n_sent == n_received || prewarm_ctl->prewarm_canceled); + elog(LOG, "LFC: worker %d complete prewarming: loaded %ld pages", worker_id, (long)n_received); + prewarm_ctl->prewarm_workers[worker_id].completed = GetCurrentTimestamp(); +} + +/* + * Prewarm LFC cache to the specified state. Uses the new communicator + * + * FIXME: Is there a race condition because we're not holding Postgres + * buffer manager locks? + */ +static void +lfc_prewarm_with_async_requests(FileCacheState *fcs) +{ + size_t n_entries; + uint8 *bitmap; + uint64 bitno; + int blocks_per_chunk; + + Assert(neon_use_communicator_worker); + + if (lfc_prewarm_limit == 0) + { + elog(LOG, "LFC: prewarm is disabled"); + return; + } + + if (fcs == NULL || fcs->n_chunks == 0) + { + elog(LOG, "LFC: nothing to prewarm"); + return; + } + + n_entries = Min(fcs->n_chunks, lfc_prewarm_limit); + Assert(n_entries != 0); + + LWLockAcquire(prewarm_lock, LW_EXCLUSIVE); + + /* Do not prewarm more entries than LFC limit */ + /* FIXME */ +#if 0 + if (prewarm_ctl->limit <= prewarm_ctl->size) + { + elog(LOG, "LFC: skip prewarm because LFC is already filled"); + LWLockRelease(prewarm_lock); + return; + } +#endif + + if (prewarm_ctl->prewarm_active) + { + LWLockRelease(prewarm_lock); + elog(ERROR, "LFC: skip prewarm because another prewarm is still active"); + } + prewarm_ctl->n_prewarm_entries = n_entries; + prewarm_ctl->n_prewarm_workers = -1; + prewarm_ctl->prewarm_active = true; + prewarm_ctl->prewarm_canceled = false; + + /* Calculate total number of pages to be prewarmed */ + prewarm_ctl->total_prewarm_pages = fcs->n_pages; + + LWLockRelease(prewarm_lock); + + elog(LOG, "LFC: start prewarming"); + lfc_do_prewarm = true; + lfc_prewarm_cancel = false; + + bitmap = FILE_CACHE_STATE_BITMAP(fcs); + + blocks_per_chunk = 1 << fcs->chunk_size_log; + + bitno = 0; + for (uint32 chunkno = 0; chunkno < fcs->n_chunks; chunkno++) + { + BufferTag *chunk_tag = &fcs->chunks[chunkno]; + BlockNumber request_startblkno = InvalidBlockNumber; + BlockNumber request_endblkno; + + if (lfc_prewarm_cancel) + { + prewarm_ctl->prewarm_canceled = true; + break; + } + + /* take next chunk */ + for (int j = 0; j < blocks_per_chunk; j++) + { + BlockNumber blkno = chunk_tag->blockNum + j; + + if (BITMAP_ISSET(bitmap, bitno)) + { + if (request_startblkno != InvalidBlockNumber) + { + if (request_endblkno == blkno) + { + /* append this block to the request */ + request_endblkno++; + } + else + { + /* flush this request, and start new one */ + communicator_new_prefetch_register_bufferv( + BufTagGetNRelFileInfo(*chunk_tag), + chunk_tag->forkNum, + request_startblkno, + request_endblkno - request_startblkno + ); + request_startblkno = blkno; + request_endblkno = blkno + 1; + } + } + else + { + /* flush this request, if any, and start new one */ + if (request_startblkno != InvalidBlockNumber) + { + communicator_new_prefetch_register_bufferv( + BufTagGetNRelFileInfo(*chunk_tag), + chunk_tag->forkNum, + request_startblkno, + request_endblkno - request_startblkno + ); + } + request_startblkno = blkno; + request_endblkno = blkno + 1; + } + prewarm_ctl->prewarmed_pages += 1; + } + bitno++; + } + + /* flush this request */ + communicator_new_prefetch_register_bufferv( + BufTagGetNRelFileInfo(*chunk_tag), + chunk_tag->forkNum, + request_startblkno, + request_endblkno - request_startblkno + ); + request_startblkno = request_endblkno = InvalidBlockNumber; + } + + Assert(n_sent == n_received || prewarm_ctl->prewarm_canceled); + elog(LOG, "LFC: complete prewarming: loaded %lu pages", (unsigned long) prewarm_ctl->prewarmed_pages); + prewarm_ctl->completed = GetCurrentTimestamp(); + + LWLockAcquire(prewarm_lock, LW_EXCLUSIVE); + prewarm_ctl->prewarm_active = false; + LWLockRelease(prewarm_lock); +} + +PG_FUNCTION_INFO_V1(get_local_cache_state); + +Datum +get_local_cache_state(PG_FUNCTION_ARGS) +{ + size_t max_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0); + FileCacheState* fcs; + + if (neon_use_communicator_worker) + fcs = communicator_new_get_lfc_state(max_entries); + else + fcs = lfc_get_state(max_entries); + + if (fcs != NULL) + PG_RETURN_BYTEA_P((bytea*)fcs); + else + PG_RETURN_NULL(); +} + +PG_FUNCTION_INFO_V1(prewarm_local_cache); + +Datum +prewarm_local_cache(PG_FUNCTION_ARGS) +{ + bytea* state = PG_GETARG_BYTEA_PP(0); + uint32 n_workers = PG_GETARG_INT32(1); + FileCacheState* fcs; + + fcs = (FileCacheState *)state; + validate_fcs(fcs); + + if (neon_use_communicator_worker) + lfc_prewarm_with_async_requests(fcs); + else + lfc_prewarm_with_workers(fcs, n_workers); + + PG_RETURN_NULL(); +} + +PG_FUNCTION_INFO_V1(get_prewarm_info); + +Datum +get_prewarm_info(PG_FUNCTION_ARGS) +{ + Datum values[4]; + bool nulls[4]; + TupleDesc tupdesc; + uint32 prewarmed_pages = 0; + uint32 skipped_pages = 0; + uint32 active_workers = 0; + uint32 total_pages; + + if (lfc_size_limit == 0) + PG_RETURN_NULL(); + + LWLockAcquire(prewarm_lock, LW_SHARED); + if (!prewarm_ctl || prewarm_ctl->n_prewarm_workers == 0) + { + LWLockRelease(prewarm_lock); + PG_RETURN_NULL(); + } + + if (prewarm_ctl->n_prewarm_workers == -1) + { + total_pages = prewarm_ctl->total_prewarm_pages; + prewarmed_pages = prewarm_ctl->prewarmed_pages; + skipped_pages = prewarm_ctl->prewarmed_pages; + active_workers = 1; + } + else + { + size_t n_workers; + + n_workers = prewarm_ctl->n_prewarm_workers; + total_pages = prewarm_ctl->total_prewarm_pages; + for (size_t i = 0; i < n_workers; i++) + { + PrewarmWorkerState *ws = &prewarm_ctl->prewarm_workers[i]; + + prewarmed_pages += ws->prewarmed_pages; + skipped_pages += ws->skipped_pages; + active_workers += ws->completed != 0; + } + } + LWLockRelease(prewarm_lock); + + tupdesc = CreateTemplateTupleDesc(4); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "total_pages", INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "prewarmed_pages", INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "skipped_pages", INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "active_workers", INT4OID, -1, 0); + tupdesc = BlessTupleDesc(tupdesc); + + MemSet(nulls, 0, sizeof(nulls)); + + values[0] = Int32GetDatum(total_pages); + values[1] = Int32GetDatum(prewarmed_pages); + values[2] = Int32GetDatum(skipped_pages); + values[3] = Int32GetDatum(active_workers); + + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); +} diff --git a/pgxn/neon/lfc_prewarm.h b/pgxn/neon/lfc_prewarm.h new file mode 100644 index 0000000000..09d224b1fc --- /dev/null +++ b/pgxn/neon/lfc_prewarm.h @@ -0,0 +1,39 @@ +/*------------------------------------------------------------------------- + * + * lfc_prewarm.h + * Local File Cache prewarmer + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + *------------------------------------------------------------------------- + */ +#ifndef LFC_PREWARM_H +#define LFC_PREWARM_H + +#include "storage/buf_internals.h" + +typedef struct FileCacheState +{ + int32 vl_len_; /* varlena header (do not touch directly!) */ + uint32 magic; + uint32 n_chunks; + uint32 n_pages; + uint16 chunk_size_log; + BufferTag chunks[FLEXIBLE_ARRAY_MEMBER]; + /* followed by bitmap */ +} FileCacheState; + +#define FILE_CACHE_STATE_MAGIC 0xfcfcfcfc + +#define FILE_CACHE_STATE_BITMAP(fcs) ((uint8*)&(fcs)->chunks[(fcs)->n_chunks]) +#define FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_chunks, blocks_per_chunk) (sizeof(FileCacheState) + (n_chunks)*sizeof(BufferTag) + (((n_chunks) * blocks_per_chunk)+7)/8) +#define FILE_CACHE_STATE_SIZE(fcs) (sizeof(FileCacheState) + (fcs->n_chunks)*sizeof(BufferTag) + (((fcs->n_chunks) << fcs->chunk_size_log)+7)/8) + +extern void pg_init_prewarm(void); +extern void PrewarmShmemRequest(void); +extern void PrewarmShmemInit(void); + +#endif /* LFC_PREWARM_H */ + + diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index 8efea63e72..59ecd9ab1c 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -503,6 +503,7 @@ _PG_init(void) pg_init_libpagestore(); relsize_hash_init(); lfc_init(); + pg_init_prewarm(); pg_init_walproposer(); init_lwlsncache(); @@ -728,6 +729,7 @@ neon_shmem_request_hook(void) #endif LfcShmemRequest(); + PrewarmShmemRequest(); NeonPerfCountersShmemRequest(); PagestoreShmemRequest(); RelsizeCacheShmemRequest(); @@ -752,6 +754,7 @@ neon_shmem_startup_hook(void) LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); LfcShmemInit(); + PrewarmShmemInit(); NeonPerfCountersShmemInit(); PagestoreShmemInit(); RelsizeCacheShmemInit(); diff --git a/pgxn/neon/neon_pgversioncompat.h b/pgxn/neon/neon_pgversioncompat.h index 288b6dd42f..85646a6dc5 100644 --- a/pgxn/neon/neon_pgversioncompat.h +++ b/pgxn/neon/neon_pgversioncompat.h @@ -76,16 +76,16 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode, (tag).rnode = (rinfo); \ } while (false) -#define BufTagGetNRelFileInfo(tag) tag.rnode +#define BufTagGetNRelFileInfo(tag) (tag).rnode #define BufTagGetRelNumber(tagp) ((tagp)->rnode.relNode) -#define BufTagInit(tag, relNumber, forknum, blkno, spcOid, dbOid) \ +#define BufTagInit(tag, rel_number, fork_number, block_number, spc_oid, db_oid) \ do { \ - RelFileNode rnode = { .spcNode = spcOid, .dbNode = dbOid, .relNode = relNumber}; \ - (tag).forkNum = forknum; \ - (tag).blockNum = blkno; \ - (tag).rnode = rnode; \ + RelFileNode rnode = { .spcNode = (spc_oid), .dbNode = (db_oid), .relNode = (rel_number)}; \ + (tag).forkNum = (fork_number); \ + (tag).blockNum = (block_number); \ + (tag).rnode = rnode; \ } while (false) #define InvalidRelFileNumber InvalidOid @@ -137,13 +137,13 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode, .relNumber = (tag).relNumber, \ }) -#define BufTagInit(tag, relNumber, forknum, blkno, spcOid, dbOid) \ +#define BufTagInit(tag, rel_number, fork_number, block_number, spc_oid, db_oid) \ do { \ - (tag).forkNum = forknum; \ - (tag).blockNum = blkno; \ - (tag).spcOid = spcOid; \ - (tag).dbOid = dbOid; \ - (tag).relNumber = relNumber; \ + (tag).forkNum = (fork_number); \ + (tag).blockNum = (block_number); \ + (tag).spcOid = (spc_oid); \ + (tag).dbOid = (db_oid); \ + (tag).relNumber = (rel_number); \ } while (false) #define SMgrRelGetRelInfo(reln) \