mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 20:40:37 +00:00
Prefetch using background workers
This commit is contained in:
@@ -25,6 +25,7 @@
|
||||
#include "pgstat.h"
|
||||
#include "port/pg_iovec.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include RELFILEINFO_HDR
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/fd.h"
|
||||
@@ -32,6 +33,8 @@
|
||||
#include "storage/latch.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "storage/pg_shmem.h"
|
||||
#include "storage/procsignal.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/dynahash.h"
|
||||
#include "utils/guc.h"
|
||||
@@ -137,10 +140,9 @@ typedef struct FileCacheEntry
|
||||
|
||||
typedef struct PrewarmWorkerState
|
||||
{
|
||||
uint32 total_chunks;
|
||||
uint32 curr_chunk;
|
||||
uint32 prewarmed_pages;
|
||||
uint32 skipped_pages;
|
||||
BackgroundWorkerHandle *handle;
|
||||
} PrewarmWorkerState;
|
||||
|
||||
typedef struct FileCacheControl
|
||||
@@ -165,6 +167,13 @@ typedef struct FileCacheControl
|
||||
HyperLogLogState wss_estimation; /* estimation of working set size */
|
||||
ConditionVariable cv[N_COND_VARS]; /* turnstile of condition variables */
|
||||
PrewarmWorkerState prewarm_workers[MAX_PREWARM_WORKERS];
|
||||
size_t n_prewarm_workers;
|
||||
size_t n_prewarm_entries;
|
||||
size_t total_prewarm_pages;
|
||||
size_t prewarm_batch;
|
||||
bool prewarm_active;
|
||||
bool prewarm_canceled;
|
||||
dsm_handle prewarm_lfc_state_handle;
|
||||
} FileCacheControl;
|
||||
|
||||
#define FILE_CACHE_STATE_MAGIC 0xfcfcfcfc
|
||||
@@ -197,7 +206,6 @@ static char *lfc_path;
|
||||
static uint64 lfc_generation;
|
||||
static FileCacheControl *lfc_ctl;
|
||||
static bool lfc_do_prewarm;
|
||||
static bool lfc_prewarm_canceled;
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook;
|
||||
#if PG_VERSION_NUM>=150000
|
||||
static shmem_request_hook_type prev_shmem_request_hook;
|
||||
@@ -661,7 +669,8 @@ lfc_get_state(size_t max_entries)
|
||||
}
|
||||
Assert(i == n_entries);
|
||||
fcs->n_pages = n_pages;
|
||||
elog(LOG, "LFC: save state of %ld chunks", (long)n_entries);
|
||||
Assert(pg_popcount((char*)bitmap, ((n_entries << lfc_chunk_size_log) + 7)/8) == n_pages);
|
||||
elog(LOG, "LFC: save state of %d chunks %d pages", (int)n_entries, (int)n_pages);
|
||||
}
|
||||
|
||||
LWLockRelease(lfc_lock);
|
||||
@@ -674,33 +683,27 @@ lfc_get_state(size_t max_entries)
|
||||
* and avoid race conditions with other backends.
|
||||
*/
|
||||
static void
|
||||
lfc_prewarm(FileCacheState* fcs, uint32 worker_id, uint32 n_workers)
|
||||
lfc_prewarm(FileCacheState* fcs, uint32 n_workers)
|
||||
{
|
||||
size_t snd_idx = 0, rcv_idx = 0;
|
||||
size_t n_sent = 0, n_received = 0;
|
||||
size_t fcs_chunk_size_log;
|
||||
size_t n_entries;
|
||||
size_t max_prefetch_pages;
|
||||
size_t prewarm_batch = Min(lfc_prewarm_batch, readahead_buffer_size);
|
||||
bool save_lfc_store_prefetch_result;
|
||||
PrewarmWorkerState* ws;
|
||||
uint8* bitmap;
|
||||
BufferTag tag;
|
||||
size_t fcs_size;
|
||||
dsm_segment *seg;
|
||||
|
||||
if (!lfc_ensure_opened())
|
||||
return;
|
||||
|
||||
if (prewarm_batch == 0 || n_workers == 0)
|
||||
if (prewarm_batch == 0 || lfc_prewarm_limit == 0 || n_workers == 0)
|
||||
{
|
||||
elog(LOG, "LFC: prewarm is disabled");
|
||||
return;
|
||||
}
|
||||
|
||||
if (worker_id >= MAX_PREWARM_WORKERS || worker_id >= n_workers)
|
||||
if (n_workers > MAX_PREWARM_WORKERS)
|
||||
{
|
||||
elog(ERROR, "LFC: Invalid prewarm worker id: %d", worker_id);
|
||||
elog(ERROR, "LFC: Too much prewarm workers, maximum is %d", MAX_PREWARM_WORKERS);
|
||||
}
|
||||
ws = &lfc_ctl->prewarm_workers[worker_id];
|
||||
|
||||
if (fcs == NULL || fcs->n_chunks == 0)
|
||||
{
|
||||
@@ -713,7 +716,8 @@ lfc_prewarm(FileCacheState* fcs, uint32 worker_id, uint32 n_workers)
|
||||
elog(ERROR, "LFC: Invalid file cache state magic: %X", fcs->magic);
|
||||
}
|
||||
|
||||
if (FILE_CACHE_STATE_SIZE(fcs) != VARSIZE(fcs))
|
||||
fcs_size = VARSIZE(fcs);
|
||||
if (FILE_CACHE_STATE_SIZE(fcs) != fcs_size)
|
||||
{
|
||||
elog(ERROR, "LFC: Invalid file cache state size: %u vs. %u", (unsigned)FILE_CACHE_STATE_SIZE(fcs), VARSIZE(fcs));
|
||||
}
|
||||
@@ -723,8 +727,9 @@ lfc_prewarm(FileCacheState* fcs, uint32 worker_id, uint32 n_workers)
|
||||
{
|
||||
elog(ERROR, "LFC: Invalid chunk size log: %u", fcs->chunk_size_log);
|
||||
}
|
||||
|
||||
n_entries = Min(fcs->n_chunks, lfc_prewarm_limit);
|
||||
bitmap = FILE_CACHE_STATE_BITMAP(fcs);
|
||||
Assert(n_entries != 0);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
@@ -735,104 +740,171 @@ lfc_prewarm(FileCacheState* fcs, uint32 worker_id, uint32 n_workers)
|
||||
LWLockRelease(lfc_lock);
|
||||
return;
|
||||
}
|
||||
max_prefetch_pages = n_entries << fcs_chunk_size_log;
|
||||
|
||||
if (ws->total_chunks != ws->curr_chunk)
|
||||
if (lfc_ctl->prewarm_active)
|
||||
{
|
||||
LWLockRelease(lfc_lock);
|
||||
elog(ERROR, "LFC: skip prewarm because prewarm worker %d is still active", worker_id);
|
||||
elog(ERROR, "LFC: skip prewarm because another prewarm is still active");
|
||||
}
|
||||
/* Initialize fields used to track prewarming progress */
|
||||
ws->total_chunks = n_entries;
|
||||
ws->curr_chunk = 0;
|
||||
ws->prewarmed_pages = 0;
|
||||
ws->skipped_pages = 0;
|
||||
lfc_ctl->n_prewarm_entries = n_entries;
|
||||
lfc_ctl->n_prewarm_workers = n_workers;
|
||||
lfc_ctl->prewarm_active = true;
|
||||
lfc_ctl->prewarm_canceled = false;
|
||||
lfc_ctl->prewarm_batch = prewarm_batch;
|
||||
memset(lfc_ctl->prewarm_workers, 0, n_workers*sizeof(PrewarmWorkerState));
|
||||
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
/* enable prefetch in LFC */
|
||||
save_lfc_store_prefetch_result = lfc_store_prefetch_result;
|
||||
lfc_store_prefetch_result = true;
|
||||
/* Calculate total number of pages to be prewarmed */
|
||||
lfc_ctl->total_prewarm_pages = fcs->n_pages;
|
||||
|
||||
lfc_do_prewarm = true; /* Flag for lfc_prefetch preventing replacement of existed entries if LFC ache is full */
|
||||
lfc_prewarm_canceled = false; /* Flag set if prewarm is canceled because LFC limit is reached */
|
||||
seg = dsm_create(fcs_size, 0);
|
||||
memcpy(dsm_segment_address(seg), fcs, fcs_size);
|
||||
lfc_ctl->prewarm_lfc_state_handle = dsm_segment_handle(seg);
|
||||
|
||||
elog(LOG, "LFC: start loading %ld chunks", (long)n_entries);
|
||||
PG_TRY();
|
||||
/* Spawn background workers */
|
||||
for (uint32 i = 0; i < n_workers; i++)
|
||||
{
|
||||
while (true)
|
||||
BackgroundWorker worker = {0};
|
||||
|
||||
worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
|
||||
worker.bgw_start_time = BgWorkerStart_ConsistentState;
|
||||
worker.bgw_restart_time = BGW_NEVER_RESTART;
|
||||
strcpy(worker.bgw_library_name, "neon");
|
||||
strcpy(worker.bgw_function_name, "lfc_prewarm_main");
|
||||
sprintf(worker.bgw_name, "LFC prewarm worker %d", i+1);
|
||||
strcpy(worker.bgw_type, "LFC prewarm worker");
|
||||
worker.bgw_main_arg = Int32GetDatum(i);
|
||||
/* must set notify PID to wait for shutdown */
|
||||
worker.bgw_notify_pid = MyProcPid;
|
||||
|
||||
if (!RegisterDynamicBackgroundWorker(&worker, &lfc_ctl->prewarm_workers[i].handle))
|
||||
{
|
||||
if (snd_idx < max_prefetch_pages && !lfc_prewarm_canceled)
|
||||
ereport(LOG,
|
||||
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
|
||||
errmsg("registering dynamic bgworker prewarm failed"),
|
||||
errhint("Consider increasing the configuration parameter \"%s\".", "max_worker_processes")));
|
||||
n_workers = i;
|
||||
lfc_ctl->prewarm_canceled = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for (uint32 i = 0; i < n_workers; i++)
|
||||
{
|
||||
WaitForBackgroundWorkerShutdown(lfc_ctl->prewarm_workers[i].handle);
|
||||
lfc_ctl->prewarm_workers[i].handle = NULL;
|
||||
}
|
||||
dsm_detach(seg);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
lfc_ctl->prewarm_active = false;
|
||||
LWLockRelease(lfc_lock);
|
||||
}
|
||||
|
||||
PGDLLEXPORT void lfc_prewarm_main(Datum main_arg);
|
||||
|
||||
void
|
||||
lfc_prewarm_main(Datum main_arg)
|
||||
{
|
||||
size_t snd_idx = 0, rcv_idx = 0;
|
||||
size_t n_sent = 0, n_received = 0;
|
||||
size_t fcs_chunk_size_log;
|
||||
size_t max_prefetch_pages;
|
||||
size_t prewarm_batch;
|
||||
size_t n_workers;
|
||||
dsm_segment *seg;
|
||||
FileCacheState* fcs;
|
||||
uint8* bitmap;
|
||||
BufferTag tag;
|
||||
PrewarmWorkerState* ws;
|
||||
uint32 worker_id = DatumGetInt32(main_arg);
|
||||
|
||||
pqsignal(SIGTERM, die);
|
||||
BackgroundWorkerUnblockSignals();
|
||||
|
||||
seg = dsm_attach(lfc_ctl->prewarm_lfc_state_handle);
|
||||
if (seg == NULL)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("could not map dynamic shared memory segment")));
|
||||
|
||||
fcs = (FileCacheState*) dsm_segment_address(seg);
|
||||
prewarm_batch = lfc_ctl->prewarm_batch;
|
||||
fcs_chunk_size_log = fcs->chunk_size_log;
|
||||
n_workers = lfc_ctl->n_prewarm_workers;
|
||||
max_prefetch_pages = lfc_ctl->n_prewarm_entries << fcs_chunk_size_log;
|
||||
ws = &lfc_ctl->prewarm_workers[worker_id];
|
||||
bitmap = FILE_CACHE_STATE_BITMAP(fcs);
|
||||
|
||||
/* enable prefetch in LFC */
|
||||
lfc_store_prefetch_result = true;
|
||||
lfc_do_prewarm = true; /* Flag for lfc_prefetch preventing replacement of existed entries if LFC cache is full */
|
||||
|
||||
elog(LOG, "LFC: worker %d start prewarming", worker_id);
|
||||
while (true)
|
||||
{
|
||||
if (snd_idx < max_prefetch_pages && !lfc_ctl->prewarm_canceled)
|
||||
{
|
||||
if ((snd_idx >> fcs_chunk_size_log) % n_workers != worker_id)
|
||||
{
|
||||
if ((snd_idx >> fcs_chunk_size_log) % n_workers != worker_id)
|
||||
{
|
||||
/* If there are multiple workers, split chunks between them */
|
||||
snd_idx += 1 << fcs_chunk_size_log;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (BITMAP_ISSET(bitmap, snd_idx))
|
||||
{
|
||||
tag = fcs->chunks[snd_idx >> fcs_chunk_size_log];
|
||||
tag.blockNum += snd_idx & ((1 << fcs_chunk_size_log) - 1);
|
||||
if (!lfc_cache_contains(BufTagGetNRelFileInfo(tag), tag.forkNum, tag.blockNum))
|
||||
{
|
||||
(void)communicator_prefetch_register_bufferv(tag, NULL, 1, NULL);
|
||||
n_sent += 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
BITMAP_CLR(bitmap, snd_idx);
|
||||
}
|
||||
}
|
||||
snd_idx += 1;
|
||||
}
|
||||
/* If there are multiple workers, split chunks between them */
|
||||
snd_idx += 1 << fcs_chunk_size_log;
|
||||
}
|
||||
if (n_sent >= n_received + prewarm_batch || snd_idx == max_prefetch_pages || lfc_prewarm_canceled)
|
||||
else
|
||||
{
|
||||
if (n_received == n_sent && (snd_idx == max_prefetch_pages || lfc_prewarm_canceled))
|
||||
if (BITMAP_ISSET(bitmap, snd_idx))
|
||||
{
|
||||
break;
|
||||
tag = fcs->chunks[snd_idx >> fcs_chunk_size_log];
|
||||
tag.blockNum += snd_idx & ((1 << fcs_chunk_size_log) - 1);
|
||||
if (!lfc_cache_contains(BufTagGetNRelFileInfo(tag), tag.forkNum, tag.blockNum))
|
||||
{
|
||||
(void)communicator_prefetch_register_bufferv(tag, NULL, 1, NULL);
|
||||
n_sent += 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
ws->skipped_pages += 1;
|
||||
BITMAP_CLR(bitmap, snd_idx);
|
||||
}
|
||||
}
|
||||
if ((rcv_idx >> fcs_chunk_size_log) % n_workers != worker_id)
|
||||
{
|
||||
/* Skip chunks processed by other workers */
|
||||
rcv_idx += 1 << fcs_chunk_size_log;
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Locate next block to prefetch */
|
||||
while (!BITMAP_ISSET(bitmap, rcv_idx))
|
||||
{
|
||||
rcv_idx += 1;
|
||||
}
|
||||
/* Update progress indicator */
|
||||
ws->curr_chunk = rcv_idx >> fcs_chunk_size_log;
|
||||
|
||||
tag = fcs->chunks[rcv_idx >> fcs_chunk_size_log];
|
||||
tag.blockNum += rcv_idx & ((1 << fcs_chunk_size_log) - 1);
|
||||
if (communicator_prefetch_receive(tag))
|
||||
{
|
||||
ws->prewarmed_pages += 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
ws->skipped_pages += 1;
|
||||
}
|
||||
rcv_idx += 1;
|
||||
n_received += 1;
|
||||
snd_idx += 1;
|
||||
}
|
||||
}
|
||||
Assert(n_sent == n_received);
|
||||
elog(LOG, "LFC: worker %d complete prewarming: loaded %ld pages", worker_id, (long)n_received);
|
||||
if (n_sent >= n_received + prewarm_batch || snd_idx == max_prefetch_pages || lfc_ctl->prewarm_canceled)
|
||||
{
|
||||
if (n_received == n_sent && (snd_idx == max_prefetch_pages || lfc_ctl->prewarm_canceled))
|
||||
{
|
||||
break;
|
||||
}
|
||||
if ((rcv_idx >> fcs_chunk_size_log) % n_workers != worker_id)
|
||||
{
|
||||
/* Skip chunks processed by other workers */
|
||||
rcv_idx += 1 << fcs_chunk_size_log;
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Locate next block to prefetch */
|
||||
while (!BITMAP_ISSET(bitmap, rcv_idx))
|
||||
{
|
||||
rcv_idx += 1;
|
||||
}
|
||||
tag = fcs->chunks[rcv_idx >> fcs_chunk_size_log];
|
||||
tag.blockNum += rcv_idx & ((1 << fcs_chunk_size_log) - 1);
|
||||
if (communicator_prefetch_receive(tag))
|
||||
{
|
||||
ws->prewarmed_pages += 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
ws->skipped_pages += 1;
|
||||
}
|
||||
rcv_idx += 1;
|
||||
n_received += 1;
|
||||
}
|
||||
}
|
||||
PG_FINALLY();
|
||||
{
|
||||
lfc_store_prefetch_result = save_lfc_store_prefetch_result;
|
||||
ws->curr_chunk = n_entries;
|
||||
lfc_do_prewarm = false;
|
||||
}
|
||||
PG_END_TRY();
|
||||
Assert(n_sent == n_received);
|
||||
elog(LOG, "LFC: worker %d complete prewarming: loaded %ld pages", worker_id, (long)n_received);
|
||||
}
|
||||
|
||||
|
||||
@@ -1302,7 +1374,7 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash)
|
||||
/* Can't add this chunk - we don't have the space for it */
|
||||
hash_search_with_hash_value(lfc_hash, &entry->key, hash,
|
||||
HASH_REMOVE, NULL);
|
||||
lfc_prewarm_canceled = true; /* cancel prewarm if LFC limit is reached */
|
||||
lfc_ctl->prewarm_canceled = true; /* cancel prewarm if LFC limit is reached */
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -2045,11 +2117,10 @@ Datum
|
||||
prewarm_local_cache(PG_FUNCTION_ARGS)
|
||||
{
|
||||
bytea* state = PG_GETARG_BYTEA_PP(0);
|
||||
uint32 worker_id = PG_GETARG_INT32(1);
|
||||
uint32 n_workers = PG_GETARG_INT32(2);
|
||||
uint32 n_workers = PG_GETARG_INT32(1);
|
||||
FileCacheState* fcs = (FileCacheState*)state;
|
||||
|
||||
lfc_prewarm(fcs, worker_id, n_workers);
|
||||
lfc_prewarm(fcs, n_workers);
|
||||
|
||||
PG_RETURN_NULL();
|
||||
}
|
||||
@@ -2062,34 +2133,45 @@ get_prewarm_info(PG_FUNCTION_ARGS)
|
||||
Datum values[4];
|
||||
bool nulls[4];
|
||||
TupleDesc tupdesc;
|
||||
uint32 worker_id = PG_GETARG_INT32(0);
|
||||
PrewarmWorkerState* ws;
|
||||
uint32 prewarmed_pages = 0;
|
||||
uint32 skipped_pages = 0;
|
||||
uint32 active_workers = 0;
|
||||
uint32 total_pages;
|
||||
size_t n_workers;
|
||||
|
||||
if (lfc_size_limit == 0)
|
||||
PG_RETURN_NULL();
|
||||
|
||||
if (worker_id >= MAX_PREWARM_WORKERS)
|
||||
LWLockAcquire(lfc_lock, LW_SHARED);
|
||||
if (!lfc_ctl || lfc_ctl->n_prewarm_workers == 0)
|
||||
{
|
||||
elog(ERROR, "LFC: Invalid prewarm worker id: %d", worker_id);
|
||||
LWLockRelease(lfc_lock);
|
||||
PG_RETURN_NULL();
|
||||
}
|
||||
n_workers = lfc_ctl->n_prewarm_workers;
|
||||
total_pages = lfc_ctl->total_prewarm_pages;
|
||||
for (size_t i = 0; i < n_workers; i++)
|
||||
{
|
||||
PrewarmWorkerState* ws = &lfc_ctl->prewarm_workers[i];
|
||||
prewarmed_pages += ws->prewarmed_pages;
|
||||
skipped_pages += ws->skipped_pages;
|
||||
active_workers += ws->handle != NULL;
|
||||
}
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
tupdesc = CreateTemplateTupleDesc(4);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "total_chunks", INT4OID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "curr_chunk", INT4OID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prewarmed_pages", INT4OID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "skipped_pages", INT4OID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "total_pages", INT4OID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "prewarmed_pages", INT4OID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "skipped_pages", INT4OID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "active_workers", INT4OID, -1, 0);
|
||||
tupdesc = BlessTupleDesc(tupdesc);
|
||||
|
||||
MemSet(nulls, 0, sizeof(nulls));
|
||||
LWLockAcquire(lfc_lock, LW_SHARED);
|
||||
|
||||
ws = &lfc_ctl->prewarm_workers[worker_id];
|
||||
|
||||
values[0] = Int32GetDatum(ws->total_chunks);
|
||||
values[1] = Int32GetDatum(ws->curr_chunk);
|
||||
values[2] = Int32GetDatum(ws->prewarmed_pages);
|
||||
values[3] = Int32GetDatum(ws->skipped_pages);
|
||||
LWLockRelease(lfc_lock);
|
||||
values[0] = Int32GetDatum(total_pages);
|
||||
values[1] = Int32GetDatum(prewarmed_pages);
|
||||
values[2] = Int32GetDatum(skipped_pages);
|
||||
values[3] = Int32GetDatum(active_workers);
|
||||
|
||||
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
\echo Use "ALTER EXTENSION neon UPDATE TO '1.6'" to load this file. \quit
|
||||
|
||||
CREATE FUNCTION get_prewarm_info(worker_id integer default 0, out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer)
|
||||
CREATE FUNCTION get_prewarm_info(out total_pages integer, out prewarmed_pages integer, out skipped_pages integer, out active_workers integer)
|
||||
RETURNS record
|
||||
AS 'MODULE_PATHNAME', 'get_prewarm_info'
|
||||
LANGUAGE C STRICT
|
||||
@@ -12,7 +12,7 @@ AS 'MODULE_PATHNAME', 'get_local_cache_state'
|
||||
LANGUAGE C
|
||||
PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION prewarm_local_cache(state bytea, worker_id integer default 0, n_workers integer default 1)
|
||||
CREATE FUNCTION prewarm_local_cache(state bytea, n_workers integer default 1)
|
||||
RETURNS void
|
||||
AS 'MODULE_PATHNAME', 'prewarm_local_cache'
|
||||
LANGUAGE C STRICT
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
DROP FUNCTION IF EXISTS get_prewarm_info(worker_id integer, out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer);
|
||||
DROP FUNCTION IF EXISTS get_prewarm_info(out total_pages integer, out prewarmed_pages integer, out skipped_pages integer, out active_workers integer);
|
||||
|
||||
DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer);
|
||||
|
||||
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, worker_id integer, n_workers integer);
|
||||
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, n_workers integer default 1);
|
||||
|
||||
|
||||
|
||||
@@ -57,16 +57,14 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv):
|
||||
cur.execute("select * from get_prewarm_info()")
|
||||
prewarm_info = cur.fetchall()[0]
|
||||
log.info(f"Prewarm info: {prewarm_info}")
|
||||
log.info(f"Prewarm progress: {prewarm_info[1] * 100 // prewarm_info[0]}%")
|
||||
log.info(f"Prewarm progress: {(prewarm_info[1] + prewarm_info[2]) * 100 // prewarm_info[0]}%")
|
||||
|
||||
assert lfc_used_pages > 10000
|
||||
assert prewarm_info[0] > 0 and prewarm_info[0] == prewarm_info[1]
|
||||
assert prewarm_info[0] > 0 and prewarm_info[1] > 0 and prewarm_info[0] == prewarm_info[1] + prewarm_info[2]
|
||||
|
||||
cur.execute("select sum(pk) from t")
|
||||
assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
|
||||
|
||||
assert prewarm_info[1] > 0
|
||||
|
||||
check_pinned_entries(cur)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user