From 05381d1ddaba21af3c9fdc0e5788ecab58fd89a7 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 23 Apr 2025 23:51:34 +0300 Subject: [PATCH] Prefetch using background workers --- pgxn/neon/file_cache.c | 316 +++++++++++++++--------- pgxn/neon/neon--1.5--1.6.sql | 4 +- pgxn/neon/neon--1.6--1.5.sql | 4 +- test_runner/regress/test_lfc_prewarm.py | 6 +- 4 files changed, 205 insertions(+), 125 deletions(-) diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index e51e604ee4..bf9bbb383f 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -25,6 +25,7 @@ #include "pgstat.h" #include "port/pg_iovec.h" #include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" #include RELFILEINFO_HDR #include "storage/buf_internals.h" #include "storage/fd.h" @@ -32,6 +33,8 @@ #include "storage/latch.h" #include "storage/lwlock.h" #include "storage/pg_shmem.h" +#include "storage/procsignal.h" +#include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/dynahash.h" #include "utils/guc.h" @@ -137,10 +140,9 @@ typedef struct FileCacheEntry typedef struct PrewarmWorkerState { - uint32 total_chunks; - uint32 curr_chunk; uint32 prewarmed_pages; uint32 skipped_pages; + BackgroundWorkerHandle *handle; } PrewarmWorkerState; typedef struct FileCacheControl @@ -165,6 +167,13 @@ typedef struct FileCacheControl HyperLogLogState wss_estimation; /* estimation of working set size */ ConditionVariable cv[N_COND_VARS]; /* turnstile of condition variables */ PrewarmWorkerState prewarm_workers[MAX_PREWARM_WORKERS]; + size_t n_prewarm_workers; + size_t n_prewarm_entries; + size_t total_prewarm_pages; + size_t prewarm_batch; + bool prewarm_active; + bool prewarm_canceled; + dsm_handle prewarm_lfc_state_handle; } FileCacheControl; #define FILE_CACHE_STATE_MAGIC 0xfcfcfcfc @@ -197,7 +206,6 @@ static char *lfc_path; static uint64 lfc_generation; static FileCacheControl *lfc_ctl; static bool lfc_do_prewarm; -static bool lfc_prewarm_canceled; static shmem_startup_hook_type prev_shmem_startup_hook; #if PG_VERSION_NUM>=150000 static shmem_request_hook_type prev_shmem_request_hook; @@ -661,7 +669,8 @@ lfc_get_state(size_t max_entries) } Assert(i == n_entries); fcs->n_pages = n_pages; - elog(LOG, "LFC: save state of %ld chunks", (long)n_entries); + Assert(pg_popcount((char*)bitmap, ((n_entries << lfc_chunk_size_log) + 7)/8) == n_pages); + elog(LOG, "LFC: save state of %d chunks %d pages", (int)n_entries, (int)n_pages); } LWLockRelease(lfc_lock); @@ -674,33 +683,27 @@ lfc_get_state(size_t max_entries) * and avoid race conditions with other backends. */ static void -lfc_prewarm(FileCacheState* fcs, uint32 worker_id, uint32 n_workers) +lfc_prewarm(FileCacheState* fcs, uint32 n_workers) { - size_t snd_idx = 0, rcv_idx = 0; - size_t n_sent = 0, n_received = 0; size_t fcs_chunk_size_log; size_t n_entries; - size_t max_prefetch_pages; size_t prewarm_batch = Min(lfc_prewarm_batch, readahead_buffer_size); - bool save_lfc_store_prefetch_result; - PrewarmWorkerState* ws; - uint8* bitmap; - BufferTag tag; + size_t fcs_size; + dsm_segment *seg; if (!lfc_ensure_opened()) return; - if (prewarm_batch == 0 || n_workers == 0) + if (prewarm_batch == 0 || lfc_prewarm_limit == 0 || n_workers == 0) { elog(LOG, "LFC: prewarm is disabled"); return; } - if (worker_id >= MAX_PREWARM_WORKERS || worker_id >= n_workers) + if (n_workers > MAX_PREWARM_WORKERS) { - elog(ERROR, "LFC: Invalid prewarm worker id: %d", worker_id); + elog(ERROR, "LFC: Too much prewarm workers, maximum is %d", MAX_PREWARM_WORKERS); } - ws = &lfc_ctl->prewarm_workers[worker_id]; if (fcs == NULL || fcs->n_chunks == 0) { @@ -713,7 +716,8 @@ lfc_prewarm(FileCacheState* fcs, uint32 worker_id, uint32 n_workers) elog(ERROR, "LFC: Invalid file cache state magic: %X", fcs->magic); } - if (FILE_CACHE_STATE_SIZE(fcs) != VARSIZE(fcs)) + fcs_size = VARSIZE(fcs); + if (FILE_CACHE_STATE_SIZE(fcs) != fcs_size) { elog(ERROR, "LFC: Invalid file cache state size: %u vs. %u", (unsigned)FILE_CACHE_STATE_SIZE(fcs), VARSIZE(fcs)); } @@ -723,8 +727,9 @@ lfc_prewarm(FileCacheState* fcs, uint32 worker_id, uint32 n_workers) { elog(ERROR, "LFC: Invalid chunk size log: %u", fcs->chunk_size_log); } + n_entries = Min(fcs->n_chunks, lfc_prewarm_limit); - bitmap = FILE_CACHE_STATE_BITMAP(fcs); + Assert(n_entries != 0); LWLockAcquire(lfc_lock, LW_EXCLUSIVE); @@ -735,104 +740,171 @@ lfc_prewarm(FileCacheState* fcs, uint32 worker_id, uint32 n_workers) LWLockRelease(lfc_lock); return; } - max_prefetch_pages = n_entries << fcs_chunk_size_log; - if (ws->total_chunks != ws->curr_chunk) + if (lfc_ctl->prewarm_active) { LWLockRelease(lfc_lock); - elog(ERROR, "LFC: skip prewarm because prewarm worker %d is still active", worker_id); + elog(ERROR, "LFC: skip prewarm because another prewarm is still active"); } - /* Initialize fields used to track prewarming progress */ - ws->total_chunks = n_entries; - ws->curr_chunk = 0; - ws->prewarmed_pages = 0; - ws->skipped_pages = 0; + lfc_ctl->n_prewarm_entries = n_entries; + lfc_ctl->n_prewarm_workers = n_workers; + lfc_ctl->prewarm_active = true; + lfc_ctl->prewarm_canceled = false; + lfc_ctl->prewarm_batch = prewarm_batch; + memset(lfc_ctl->prewarm_workers, 0, n_workers*sizeof(PrewarmWorkerState)); LWLockRelease(lfc_lock); - /* enable prefetch in LFC */ - save_lfc_store_prefetch_result = lfc_store_prefetch_result; - lfc_store_prefetch_result = true; + /* Calculate total number of pages to be prewarmed */ + lfc_ctl->total_prewarm_pages = fcs->n_pages; - lfc_do_prewarm = true; /* Flag for lfc_prefetch preventing replacement of existed entries if LFC ache is full */ - lfc_prewarm_canceled = false; /* Flag set if prewarm is canceled because LFC limit is reached */ + seg = dsm_create(fcs_size, 0); + memcpy(dsm_segment_address(seg), fcs, fcs_size); + lfc_ctl->prewarm_lfc_state_handle = dsm_segment_handle(seg); - elog(LOG, "LFC: start loading %ld chunks", (long)n_entries); - PG_TRY(); + /* Spawn background workers */ + for (uint32 i = 0; i < n_workers; i++) { - while (true) + BackgroundWorker worker = {0}; + + worker.bgw_flags = BGWORKER_SHMEM_ACCESS; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + worker.bgw_restart_time = BGW_NEVER_RESTART; + strcpy(worker.bgw_library_name, "neon"); + strcpy(worker.bgw_function_name, "lfc_prewarm_main"); + sprintf(worker.bgw_name, "LFC prewarm worker %d", i+1); + strcpy(worker.bgw_type, "LFC prewarm worker"); + worker.bgw_main_arg = Int32GetDatum(i); + /* must set notify PID to wait for shutdown */ + worker.bgw_notify_pid = MyProcPid; + + if (!RegisterDynamicBackgroundWorker(&worker, &lfc_ctl->prewarm_workers[i].handle)) { - if (snd_idx < max_prefetch_pages && !lfc_prewarm_canceled) + ereport(LOG, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("registering dynamic bgworker prewarm failed"), + errhint("Consider increasing the configuration parameter \"%s\".", "max_worker_processes"))); + n_workers = i; + lfc_ctl->prewarm_canceled = true; + break; + } + } + + for (uint32 i = 0; i < n_workers; i++) + { + WaitForBackgroundWorkerShutdown(lfc_ctl->prewarm_workers[i].handle); + lfc_ctl->prewarm_workers[i].handle = NULL; + } + dsm_detach(seg); + + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + lfc_ctl->prewarm_active = false; + LWLockRelease(lfc_lock); +} + +PGDLLEXPORT void lfc_prewarm_main(Datum main_arg); + +void +lfc_prewarm_main(Datum main_arg) +{ + size_t snd_idx = 0, rcv_idx = 0; + size_t n_sent = 0, n_received = 0; + size_t fcs_chunk_size_log; + size_t max_prefetch_pages; + size_t prewarm_batch; + size_t n_workers; + dsm_segment *seg; + FileCacheState* fcs; + uint8* bitmap; + BufferTag tag; + PrewarmWorkerState* ws; + uint32 worker_id = DatumGetInt32(main_arg); + + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + seg = dsm_attach(lfc_ctl->prewarm_lfc_state_handle); + if (seg == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not map dynamic shared memory segment"))); + + fcs = (FileCacheState*) dsm_segment_address(seg); + prewarm_batch = lfc_ctl->prewarm_batch; + fcs_chunk_size_log = fcs->chunk_size_log; + n_workers = lfc_ctl->n_prewarm_workers; + max_prefetch_pages = lfc_ctl->n_prewarm_entries << fcs_chunk_size_log; + ws = &lfc_ctl->prewarm_workers[worker_id]; + bitmap = FILE_CACHE_STATE_BITMAP(fcs); + + /* enable prefetch in LFC */ + lfc_store_prefetch_result = true; + lfc_do_prewarm = true; /* Flag for lfc_prefetch preventing replacement of existed entries if LFC cache is full */ + + elog(LOG, "LFC: worker %d start prewarming", worker_id); + while (true) + { + if (snd_idx < max_prefetch_pages && !lfc_ctl->prewarm_canceled) + { + if ((snd_idx >> fcs_chunk_size_log) % n_workers != worker_id) { - if ((snd_idx >> fcs_chunk_size_log) % n_workers != worker_id) - { - /* If there are multiple workers, split chunks between them */ - snd_idx += 1 << fcs_chunk_size_log; - } - else - { - if (BITMAP_ISSET(bitmap, snd_idx)) - { - tag = fcs->chunks[snd_idx >> fcs_chunk_size_log]; - tag.blockNum += snd_idx & ((1 << fcs_chunk_size_log) - 1); - if (!lfc_cache_contains(BufTagGetNRelFileInfo(tag), tag.forkNum, tag.blockNum)) - { - (void)communicator_prefetch_register_bufferv(tag, NULL, 1, NULL); - n_sent += 1; - } - else - { - BITMAP_CLR(bitmap, snd_idx); - } - } - snd_idx += 1; - } + /* If there are multiple workers, split chunks between them */ + snd_idx += 1 << fcs_chunk_size_log; } - if (n_sent >= n_received + prewarm_batch || snd_idx == max_prefetch_pages || lfc_prewarm_canceled) + else { - if (n_received == n_sent && (snd_idx == max_prefetch_pages || lfc_prewarm_canceled)) + if (BITMAP_ISSET(bitmap, snd_idx)) { - break; + tag = fcs->chunks[snd_idx >> fcs_chunk_size_log]; + tag.blockNum += snd_idx & ((1 << fcs_chunk_size_log) - 1); + if (!lfc_cache_contains(BufTagGetNRelFileInfo(tag), tag.forkNum, tag.blockNum)) + { + (void)communicator_prefetch_register_bufferv(tag, NULL, 1, NULL); + n_sent += 1; + } + else + { + ws->skipped_pages += 1; + BITMAP_CLR(bitmap, snd_idx); + } } - if ((rcv_idx >> fcs_chunk_size_log) % n_workers != worker_id) - { - /* Skip chunks processed by other workers */ - rcv_idx += 1 << fcs_chunk_size_log; - continue; - } - - /* Locate next block to prefetch */ - while (!BITMAP_ISSET(bitmap, rcv_idx)) - { - rcv_idx += 1; - } - /* Update progress indicator */ - ws->curr_chunk = rcv_idx >> fcs_chunk_size_log; - - tag = fcs->chunks[rcv_idx >> fcs_chunk_size_log]; - tag.blockNum += rcv_idx & ((1 << fcs_chunk_size_log) - 1); - if (communicator_prefetch_receive(tag)) - { - ws->prewarmed_pages += 1; - } - else - { - ws->skipped_pages += 1; - } - rcv_idx += 1; - n_received += 1; + snd_idx += 1; } } - Assert(n_sent == n_received); - elog(LOG, "LFC: worker %d complete prewarming: loaded %ld pages", worker_id, (long)n_received); + if (n_sent >= n_received + prewarm_batch || snd_idx == max_prefetch_pages || lfc_ctl->prewarm_canceled) + { + if (n_received == n_sent && (snd_idx == max_prefetch_pages || lfc_ctl->prewarm_canceled)) + { + break; + } + if ((rcv_idx >> fcs_chunk_size_log) % n_workers != worker_id) + { + /* Skip chunks processed by other workers */ + rcv_idx += 1 << fcs_chunk_size_log; + continue; + } + + /* Locate next block to prefetch */ + while (!BITMAP_ISSET(bitmap, rcv_idx)) + { + rcv_idx += 1; + } + tag = fcs->chunks[rcv_idx >> fcs_chunk_size_log]; + tag.blockNum += rcv_idx & ((1 << fcs_chunk_size_log) - 1); + if (communicator_prefetch_receive(tag)) + { + ws->prewarmed_pages += 1; + } + else + { + ws->skipped_pages += 1; + } + rcv_idx += 1; + n_received += 1; + } } - PG_FINALLY(); - { - lfc_store_prefetch_result = save_lfc_store_prefetch_result; - ws->curr_chunk = n_entries; - lfc_do_prewarm = false; - } - PG_END_TRY(); + Assert(n_sent == n_received); + elog(LOG, "LFC: worker %d complete prewarming: loaded %ld pages", worker_id, (long)n_received); } @@ -1302,7 +1374,7 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash) /* Can't add this chunk - we don't have the space for it */ hash_search_with_hash_value(lfc_hash, &entry->key, hash, HASH_REMOVE, NULL); - lfc_prewarm_canceled = true; /* cancel prewarm if LFC limit is reached */ + lfc_ctl->prewarm_canceled = true; /* cancel prewarm if LFC limit is reached */ return false; } @@ -2045,11 +2117,10 @@ Datum prewarm_local_cache(PG_FUNCTION_ARGS) { bytea* state = PG_GETARG_BYTEA_PP(0); - uint32 worker_id = PG_GETARG_INT32(1); - uint32 n_workers = PG_GETARG_INT32(2); + uint32 n_workers = PG_GETARG_INT32(1); FileCacheState* fcs = (FileCacheState*)state; - lfc_prewarm(fcs, worker_id, n_workers); + lfc_prewarm(fcs, n_workers); PG_RETURN_NULL(); } @@ -2062,34 +2133,45 @@ get_prewarm_info(PG_FUNCTION_ARGS) Datum values[4]; bool nulls[4]; TupleDesc tupdesc; - uint32 worker_id = PG_GETARG_INT32(0); - PrewarmWorkerState* ws; + uint32 prewarmed_pages = 0; + uint32 skipped_pages = 0; + uint32 active_workers = 0; + uint32 total_pages; + size_t n_workers; if (lfc_size_limit == 0) PG_RETURN_NULL(); - if (worker_id >= MAX_PREWARM_WORKERS) + LWLockAcquire(lfc_lock, LW_SHARED); + if (!lfc_ctl || lfc_ctl->n_prewarm_workers == 0) { - elog(ERROR, "LFC: Invalid prewarm worker id: %d", worker_id); + LWLockRelease(lfc_lock); + PG_RETURN_NULL(); } + n_workers = lfc_ctl->n_prewarm_workers; + total_pages = lfc_ctl->total_prewarm_pages; + for (size_t i = 0; i < n_workers; i++) + { + PrewarmWorkerState* ws = &lfc_ctl->prewarm_workers[i]; + prewarmed_pages += ws->prewarmed_pages; + skipped_pages += ws->skipped_pages; + active_workers += ws->handle != NULL; + } + LWLockRelease(lfc_lock); tupdesc = CreateTemplateTupleDesc(4); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "total_chunks", INT4OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 2, "curr_chunk", INT4OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prewarmed_pages", INT4OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 4, "skipped_pages", INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "total_pages", INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "prewarmed_pages", INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "skipped_pages", INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "active_workers", INT4OID, -1, 0); tupdesc = BlessTupleDesc(tupdesc); MemSet(nulls, 0, sizeof(nulls)); - LWLockAcquire(lfc_lock, LW_SHARED); - ws = &lfc_ctl->prewarm_workers[worker_id]; - - values[0] = Int32GetDatum(ws->total_chunks); - values[1] = Int32GetDatum(ws->curr_chunk); - values[2] = Int32GetDatum(ws->prewarmed_pages); - values[3] = Int32GetDatum(ws->skipped_pages); - LWLockRelease(lfc_lock); + values[0] = Int32GetDatum(total_pages); + values[1] = Int32GetDatum(prewarmed_pages); + values[2] = Int32GetDatum(skipped_pages); + values[3] = Int32GetDatum(active_workers); PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); } diff --git a/pgxn/neon/neon--1.5--1.6.sql b/pgxn/neon/neon--1.5--1.6.sql index f44b1ef76b..c05f0f87aa 100644 --- a/pgxn/neon/neon--1.5--1.6.sql +++ b/pgxn/neon/neon--1.5--1.6.sql @@ -1,6 +1,6 @@ \echo Use "ALTER EXTENSION neon UPDATE TO '1.6'" to load this file. \quit -CREATE FUNCTION get_prewarm_info(worker_id integer default 0, out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer) +CREATE FUNCTION get_prewarm_info(out total_pages integer, out prewarmed_pages integer, out skipped_pages integer, out active_workers integer) RETURNS record AS 'MODULE_PATHNAME', 'get_prewarm_info' LANGUAGE C STRICT @@ -12,7 +12,7 @@ AS 'MODULE_PATHNAME', 'get_local_cache_state' LANGUAGE C PARALLEL UNSAFE; -CREATE FUNCTION prewarm_local_cache(state bytea, worker_id integer default 0, n_workers integer default 1) +CREATE FUNCTION prewarm_local_cache(state bytea, n_workers integer default 1) RETURNS void AS 'MODULE_PATHNAME', 'prewarm_local_cache' LANGUAGE C STRICT diff --git a/pgxn/neon/neon--1.6--1.5.sql b/pgxn/neon/neon--1.6--1.5.sql index db7b524c57..57512980f5 100644 --- a/pgxn/neon/neon--1.6--1.5.sql +++ b/pgxn/neon/neon--1.6--1.5.sql @@ -1,7 +1,7 @@ -DROP FUNCTION IF EXISTS get_prewarm_info(worker_id integer, out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer); +DROP FUNCTION IF EXISTS get_prewarm_info(out total_pages integer, out prewarmed_pages integer, out skipped_pages integer, out active_workers integer); DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer); -DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, worker_id integer, n_workers integer); +DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, n_workers integer default 1); diff --git a/test_runner/regress/test_lfc_prewarm.py b/test_runner/regress/test_lfc_prewarm.py index ff60afbafc..ff052ebd93 100644 --- a/test_runner/regress/test_lfc_prewarm.py +++ b/test_runner/regress/test_lfc_prewarm.py @@ -57,16 +57,14 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv): cur.execute("select * from get_prewarm_info()") prewarm_info = cur.fetchall()[0] log.info(f"Prewarm info: {prewarm_info}") - log.info(f"Prewarm progress: {prewarm_info[1] * 100 // prewarm_info[0]}%") + log.info(f"Prewarm progress: {(prewarm_info[1] + prewarm_info[2]) * 100 // prewarm_info[0]}%") assert lfc_used_pages > 10000 - assert prewarm_info[0] > 0 and prewarm_info[0] == prewarm_info[1] + assert prewarm_info[0] > 0 and prewarm_info[1] > 0 and prewarm_info[0] == prewarm_info[1] + prewarm_info[2] cur.execute("select sum(pk) from t") assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2 - assert prewarm_info[1] > 0 - check_pinned_entries(cur)