mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-03 05:20:38 +00:00
Implement LFC prewarm
This commit is contained in:
@@ -345,6 +345,7 @@ impl AuxFileV2 {
|
||||
AuxFileV2::Recognized("pg_logical/replorigin_checkpoint", hash)
|
||||
}
|
||||
(2, 1) => AuxFileV2::Recognized("pg_replslot/", hash),
|
||||
(4, 1) => AuxFileV2::Recognized("lfc.state", hash),
|
||||
(1, 0xff) => AuxFileV2::OtherWithPrefix("pg_logical/", hash),
|
||||
(0xff, 0xff) => AuxFileV2::Other(hash),
|
||||
_ => return None,
|
||||
|
||||
@@ -39,6 +39,7 @@ fn aux_hash_to_metadata_key(dir_level1: u8, dir_level2: u8, data: &[u8]) -> Key
|
||||
|
||||
const AUX_DIR_PG_LOGICAL: u8 = 0x01;
|
||||
const AUX_DIR_PG_REPLSLOT: u8 = 0x02;
|
||||
const AUX_DIR_LFC_STATE: u8 = 0x04;
|
||||
const AUX_DIR_PG_UNKNOWN: u8 = 0xFF;
|
||||
|
||||
/// Encode the aux file into a fixed-size key.
|
||||
@@ -75,6 +76,8 @@ pub fn encode_aux_file_key(path: &str) -> Key {
|
||||
aux_hash_to_metadata_key(AUX_DIR_PG_LOGICAL, 0xFF, fname.as_bytes())
|
||||
} else if let Some(fname) = path.strip_prefix("pg_replslot/") {
|
||||
aux_hash_to_metadata_key(AUX_DIR_PG_REPLSLOT, 0x01, fname.as_bytes())
|
||||
} else if let Some(fname) = path.strip_prefix("lfc.state") {
|
||||
aux_hash_to_metadata_key(AUX_DIR_LFC_STATE, 0x01, fname.as_bytes())
|
||||
} else {
|
||||
if cfg!(debug_assertions) {
|
||||
warn!(
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
#include "neon_pgversioncompat.h"
|
||||
|
||||
#include "access/parallel.h"
|
||||
#include "access/xlog.h"
|
||||
#include "funcapi.h"
|
||||
#include "miscadmin.h"
|
||||
#include "pagestore_client.h"
|
||||
@@ -30,12 +31,14 @@
|
||||
#include "port/pg_iovec.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
#include RELFILEINFO_HDR
|
||||
#include "replication/message.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/fd.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/latch.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "storage/pg_shmem.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/dynahash.h"
|
||||
#include "utils/guc.h"
|
||||
@@ -45,7 +48,7 @@
|
||||
#include "neon.h"
|
||||
#include "neon_perf_counters.h"
|
||||
|
||||
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "Assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
|
||||
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "LFC: assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
|
||||
|
||||
/*
|
||||
* Local file cache is used to temporary store relations pages in local file system.
|
||||
@@ -100,7 +103,9 @@ typedef struct FileCacheEntry
|
||||
BufferTag key;
|
||||
uint32 hash;
|
||||
uint32 offset;
|
||||
uint32 access_count;
|
||||
uint32 access_count : 30;
|
||||
uint32 prewarm_requested : 1; /* entry should be filled by prewarm */
|
||||
uint32 prewarm_started : 1; /* chunk is written by prewarm BGW */
|
||||
uint32 bitmap[CHUNK_BITMAP_SIZE];
|
||||
dlist_node list_node; /* LRU/holes list node */
|
||||
} FileCacheEntry;
|
||||
@@ -124,20 +129,49 @@ typedef struct FileCacheControl
|
||||
HyperLogLogState wss_estimation; /* estimation of working set size */
|
||||
} FileCacheControl;
|
||||
|
||||
typedef struct FileCacheStateEntry
|
||||
{
|
||||
BufferTag key;
|
||||
uint32 bitmap[CHUNK_BITMAP_SIZE];
|
||||
} FileCacheStateEntry;
|
||||
|
||||
static FileCacheStateEntry* lfc_state;
|
||||
static size_t lfc_state_size;
|
||||
static HTAB *lfc_hash;
|
||||
static int lfc_desc = 0;
|
||||
static LWLockId lfc_lock;
|
||||
static int lfc_max_size;
|
||||
static int lfc_size_limit;
|
||||
static int lfc_prewarm_limit;
|
||||
static int lfc_prewarm_batch;
|
||||
static char *lfc_path;
|
||||
static FileCacheControl *lfc_ctl;
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook;
|
||||
#if PG_VERSION_NUM>=150000
|
||||
static shmem_request_hook_type prev_shmem_request_hook;
|
||||
#endif
|
||||
static CustomCheckpointHookType PrevCheckpointHook;
|
||||
|
||||
static void lfc_init_prewarm(void);
|
||||
|
||||
#define LFC_ENABLED() (lfc_ctl->limit != 0)
|
||||
|
||||
|
||||
static void
|
||||
LfcCheckpointHook(int flags)
|
||||
{
|
||||
if (flags & CHECKPOINT_IS_SHUTDOWN)
|
||||
{
|
||||
lfc_save_state();
|
||||
}
|
||||
|
||||
if (PrevCheckpointHook)
|
||||
{
|
||||
PrevCheckpointHook(flags);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Local file cache is optional and Neon can work without it.
|
||||
* In case of any any errors with this cache, we should disable it but to not throw error.
|
||||
@@ -149,7 +183,7 @@ lfc_disable(char const *op)
|
||||
{
|
||||
int fd;
|
||||
|
||||
elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
|
||||
elog(WARNING, "LFC: failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
|
||||
|
||||
/* Invalidate hash */
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
@@ -184,7 +218,7 @@ lfc_disable(char const *op)
|
||||
pgstat_report_wait_end();
|
||||
|
||||
if (rc < 0)
|
||||
elog(WARNING, "Failed to truncate local file cache %s: %m", lfc_path);
|
||||
elog(WARNING, "LFC: failed to truncate local file cache %s: %m", lfc_path);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -196,7 +230,7 @@ lfc_disable(char const *op)
|
||||
|
||||
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
|
||||
if (fd < 0)
|
||||
elog(WARNING, "Failed to recreate local file cache %s: %m", lfc_path);
|
||||
elog(WARNING, "LFC: failed to recreate local file cache %s: %m", lfc_path);
|
||||
else
|
||||
close(fd);
|
||||
|
||||
@@ -236,6 +270,17 @@ lfc_ensure_opened(void)
|
||||
return enabled;
|
||||
}
|
||||
|
||||
PGDLLEXPORT void
|
||||
LfcPrewarmMain(Datum main_arg)
|
||||
{
|
||||
pqsignal(SIGTERM, die);
|
||||
|
||||
BackgroundWorkerUnblockSignals();
|
||||
|
||||
lfc_load_pages();
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
lfc_shmem_startup(void)
|
||||
{
|
||||
@@ -285,16 +330,24 @@ lfc_shmem_startup(void)
|
||||
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
|
||||
if (fd < 0)
|
||||
{
|
||||
elog(WARNING, "Failed to create local file cache %s: %m", lfc_path);
|
||||
elog(WARNING, "LFC: failed to create local file cache %s: %m", lfc_path);
|
||||
lfc_ctl->limit = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
close(fd);
|
||||
lfc_ctl->limit = SIZE_MB_TO_CHUNKS(lfc_size_limit);
|
||||
/* Prewarming of replica has no sense because if WAL record's target page is not present in shared buffer, then correspondent LFC entry is invalidated */
|
||||
if (LFC_ENABLED() && lfc_prewarm_limit != 0/* && !RecoveryInProgress()*/)
|
||||
{
|
||||
lfc_init_prewarm();
|
||||
}
|
||||
}
|
||||
}
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
|
||||
PrevCheckpointHook = CustomCheckpointHook;
|
||||
CustomCheckpointHook = LfcCheckpointHook;
|
||||
}
|
||||
|
||||
static void
|
||||
@@ -327,7 +380,7 @@ lfc_check_limit_hook(int *newval, void **extra, GucSource source)
|
||||
{
|
||||
if (*newval > lfc_max_size)
|
||||
{
|
||||
elog(ERROR, "neon.file_cache_size_limit can not be larger than neon.max_file_cache_size");
|
||||
elog(ERROR, "LFC: neon.file_cache_size_limit can not be larger than neon.max_file_cache_size");
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
@@ -436,6 +489,32 @@ lfc_init(void)
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
DefineCustomIntVariable("neon.file_cache_prewarm_limit",
|
||||
"Maximal number of prewarmed pages",
|
||||
NULL,
|
||||
&lfc_prewarm_limit,
|
||||
0, /* disabled by default */
|
||||
0,
|
||||
INT_MAX,
|
||||
PGC_SIGHUP,
|
||||
0,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
DefineCustomIntVariable("neon.file_cache_prewarm_batch",
|
||||
"Number of pages retrivied by prewarm from page server",
|
||||
NULL,
|
||||
&lfc_prewarm_batch,
|
||||
64,
|
||||
1,
|
||||
INT_MAX,
|
||||
PGC_SIGHUP,
|
||||
0,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
if (lfc_max_size == 0)
|
||||
return;
|
||||
|
||||
@@ -447,8 +526,277 @@ lfc_init(void)
|
||||
#else
|
||||
lfc_shmem_request();
|
||||
#endif
|
||||
|
||||
if (lfc_prewarm_limit != 0)
|
||||
{
|
||||
BackgroundWorker bgw;
|
||||
memset(&bgw, 0, sizeof(bgw));
|
||||
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
|
||||
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
|
||||
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
|
||||
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "LfcPrewarmMain");
|
||||
snprintf(bgw.bgw_name, BGW_MAXLEN, "LFC prewarm");
|
||||
snprintf(bgw.bgw_type, BGW_MAXLEN, "LFC prewarm");
|
||||
|
||||
RegisterBackgroundWorker(&bgw);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Save state of local file cache as AUX file. Size of saved state is limited by lfc_prewarm_limit.
|
||||
* This function saves first mostrecently used pages.
|
||||
* It is expected to be called at shutdown checkpoint by checkpointer.
|
||||
*/
|
||||
void
|
||||
lfc_save_state(void)
|
||||
{
|
||||
size_t i = 0, max_entries = lfc_prewarm_limit;
|
||||
FileCacheStateEntry* fs;
|
||||
|
||||
if (max_entries == 0)
|
||||
return;
|
||||
|
||||
fs = (FileCacheStateEntry*)malloc(sizeof(FileCacheStateEntry) * max_entries);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_SHARED);
|
||||
if (LFC_ENABLED())
|
||||
{
|
||||
dlist_iter iter;
|
||||
dlist_reverse_foreach(iter, &lfc_ctl->lru)
|
||||
{
|
||||
FileCacheEntry *entry = dlist_container(FileCacheEntry, list_node, iter.cur);
|
||||
memcpy(&fs[i].key, &entry->key, sizeof entry->key);
|
||||
memcpy(fs[i].bitmap, entry->bitmap, sizeof entry->bitmap);
|
||||
if (++i == max_entries)
|
||||
break;
|
||||
}
|
||||
elog(LOG, "LFC: save state of %ld chunks", (long)i);
|
||||
}
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
if (i != 0)
|
||||
{
|
||||
#if PG_MAJORVERSION_NUM < 17
|
||||
XLogFlush(LogLogicalMessage("neon-file:lfc.state", (char const*)fs, sizeof(FileCacheStateEntry) * i, false));
|
||||
#else
|
||||
LogLogicalMessage("neon-file:lfc.state", (char const*)fs, sizeof(FileCacheStateEntry) * i, false, true);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Load LFC state and enter entries in hash table.
|
||||
* It is needed to track modification of prewarmed pages.
|
||||
* All such entries have `prewarm` flag set. When entry is updated (some backed reads or writes
|
||||
* some pages from this chunk), then `prewarm` flag is cleared, prohibiting prefetch for this chunk.
|
||||
* It prevents overwritting page updated or loaded by backend with older one, loaded by prewarm.
|
||||
* This function is called while LFC initialization: no synchronization is needed.
|
||||
*/
|
||||
static void
|
||||
lfc_init_prewarm(void)
|
||||
{
|
||||
FileCacheStateEntry* fs;
|
||||
ssize_t rc;
|
||||
size_t i, max_entries = lfc_prewarm_limit;
|
||||
uint32_t hash;
|
||||
FileCacheEntry *entry;
|
||||
int fd = OpenTransientFile("lfc.state", O_RDONLY | PG_BINARY);
|
||||
|
||||
if (fd < 0)
|
||||
{
|
||||
elog(LOG, "LFC: state file is missing");
|
||||
return;
|
||||
}
|
||||
|
||||
fs = (FileCacheStateEntry*)malloc(sizeof(FileCacheStateEntry) * max_entries);
|
||||
rc = read(fd, fs, sizeof(FileCacheStateEntry) * max_entries);
|
||||
if (rc <= 0)
|
||||
{
|
||||
elog(LOG, "LFC: Failed to read state file: %m");
|
||||
CloseTransientFile(fd);
|
||||
free(fs);
|
||||
return;
|
||||
}
|
||||
CloseTransientFile(fd);
|
||||
|
||||
/* Do not try to load more than fits in LFC */
|
||||
max_entries = Min(rc / sizeof(FileCacheStateEntry), lfc_ctl->limit);
|
||||
elog(LOG, "LFC: read state with %lu entries", (long)(rc / sizeof(FileCacheStateEntry)));
|
||||
|
||||
for (i = 0; i < max_entries; i++)
|
||||
{
|
||||
hash = get_hash_value(lfc_hash, &fs[i].key);
|
||||
entry = hash_search_with_hash_value(lfc_hash, &fs[i].key, hash, HASH_ENTER, NULL);
|
||||
entry->offset = i;
|
||||
entry->hash = hash;
|
||||
entry->access_count = 0;
|
||||
entry->prewarm_requested = true;
|
||||
entry->prewarm_started = false;
|
||||
memset(entry->bitmap, 0, sizeof entry->bitmap);
|
||||
/* Most recently visted pages are stored first */
|
||||
dlist_push_head(&lfc_ctl->lru, &entry->list_node);
|
||||
}
|
||||
Assert(lfc_ctl->size == 0);
|
||||
lfc_ctl->used = lfc_ctl->size = max_entries;
|
||||
lfc_state = fs;
|
||||
lfc_state_size = max_entries;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Load pages from saved LFC state.
|
||||
*
|
||||
* Load is done by backgraound work. It can interfere with
|
||||
* accessed to the pages by other backends. Usually access to LFC is protected by shared buffers: when Postgres
|
||||
* is reading page, it pins shared buffer and enforces that only one backend is reading it, while other are waiting read completion.
|
||||
*
|
||||
* But it is not true for prewarming: backend can fetch page itself, modify and then write it to LFC. At the
|
||||
* same time prewarm BGW tries to write deteriorated image of this page in LFC. To increase concurrency, access to LFC files (both read and write)
|
||||
* is performed without holding locks. So it can happen that two or more processes write different content to the same location in the LFC file.
|
||||
* Certainly we can not rely on disk content in this case.
|
||||
*
|
||||
* To solve this problem with use two flags in LFC entry: `prewarm_requested` and `prewarm_started`. First is set by `lfc_init_prewarm` when it loads saved LFC state.
|
||||
* Prewarm BGW perform write to LFC file only if this flag is set. This flag is cleared if any other backend perform write to this LFC chunk.
|
||||
* In this case data loaded by prewarm BGW is considered to be deteriorated and should be just ignored.
|
||||
*
|
||||
* But as bat as far as write to LFC is performed without holding lock, there is no guarantee that such write is in progress.
|
||||
* This is why second flag is used: `prewarm_started`. It is set by prewarm BGW when is starts writing page and cleared when write is completed.
|
||||
* Any other backend writing to LFC should abandon it's write to LFC file (just not mark page as loaded in bitmap) once it sees this flag.
|
||||
* So nether prewarm BGW, nether backend are saving page in LFC - it is just skipped.
|
||||
*/
|
||||
void
|
||||
lfc_load_pages(void)
|
||||
{
|
||||
ssize_t rc;
|
||||
size_t snd_idx = 0, rcv_idx = 0;
|
||||
size_t n_sent = 0, n_received = 0;
|
||||
FileCacheEntry *entry;
|
||||
uint64 generation;
|
||||
uint32 entry_offset;
|
||||
uint32 hash;
|
||||
int shard_no;
|
||||
size_t i;
|
||||
FileCacheStateEntry* fs = lfc_state;
|
||||
size_t max_entries = lfc_state_size;
|
||||
|
||||
if (!lfc_ensure_opened())
|
||||
return;
|
||||
|
||||
if (max_entries == 0 || fs == NULL)
|
||||
{
|
||||
elog(LOG, "LFC: prewarm is disabled");
|
||||
return;
|
||||
}
|
||||
elog(LOG, "LFC: start loading %ld chunks", (long)max_entries);
|
||||
|
||||
while (true)
|
||||
{
|
||||
size_t chunk_no = snd_idx / BLOCKS_PER_CHUNK;
|
||||
size_t offs_in_chunk = snd_idx % BLOCKS_PER_CHUNK;
|
||||
if (chunk_no < max_entries)
|
||||
{
|
||||
if (fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31)))
|
||||
{
|
||||
NeonGetPageRequest request = {
|
||||
.req.tag = T_NeonGetPageRequest,
|
||||
/* lsn and not_modified_since are filled in below */
|
||||
.rinfo = BufTagGetNRelFileInfo(fs[chunk_no].key),
|
||||
.forknum = fs[chunk_no].key.forkNum,
|
||||
.blkno = fs[chunk_no].key.blockNum + offs_in_chunk,
|
||||
.req.lsn = UINT64_MAX,
|
||||
.req.not_modified_since = 0
|
||||
};
|
||||
shard_no = get_shard_number(&fs[chunk_no].key);
|
||||
while (!page_server->send(shard_no, (NeonRequest *) &request)
|
||||
|| !page_server->flush(shard_no))
|
||||
{
|
||||
/* do nothing */
|
||||
}
|
||||
n_sent += 1;
|
||||
}
|
||||
snd_idx += 1;
|
||||
}
|
||||
if (n_sent >= n_received + lfc_prewarm_batch || chunk_no == max_entries)
|
||||
{
|
||||
NeonResponse * resp;
|
||||
do
|
||||
{
|
||||
chunk_no = rcv_idx / BLOCKS_PER_CHUNK;
|
||||
offs_in_chunk = rcv_idx % BLOCKS_PER_CHUNK;
|
||||
rcv_idx += 1;
|
||||
} while (!(fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31))));
|
||||
|
||||
shard_no = get_shard_number(&fs[chunk_no].key);
|
||||
resp = page_server->receive(shard_no);
|
||||
|
||||
if (resp->tag != T_NeonGetPageResponse)
|
||||
{
|
||||
elog(LOG, "LFC: unexpected response type: %d", resp->tag);
|
||||
free(fs);
|
||||
return;
|
||||
}
|
||||
|
||||
hash = get_hash_value(lfc_hash, &fs[chunk_no].key);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
entry = hash_search_with_hash_value(lfc_hash, &fs[chunk_no].key, hash, HASH_FIND, NULL);
|
||||
if (entry != NULL && entry->prewarm_requested)
|
||||
{
|
||||
/* Unlink entry from LRU list to pin it for the duration of IO operation */
|
||||
if (entry->access_count++ == 0)
|
||||
dlist_delete(&entry->list_node);
|
||||
|
||||
generation = lfc_ctl->generation;
|
||||
entry_offset = entry->offset;
|
||||
Assert(!entry->prewarm_started);
|
||||
entry->prewarm_started = true;
|
||||
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
rc = pwrite(lfc_desc, ((NeonGetPageResponse*)resp)->page, BLCKSZ, ((off_t) entry_offset * BLOCKS_PER_CHUNK + offs_in_chunk) * BLCKSZ);
|
||||
if (rc != BLCKSZ)
|
||||
{
|
||||
lfc_disable("write");
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
if (lfc_ctl->generation == generation)
|
||||
{
|
||||
CriticalAssert(LFC_ENABLED());
|
||||
if (--entry->access_count == 0)
|
||||
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
|
||||
if (entry->prewarm_requested)
|
||||
{
|
||||
lfc_ctl->used_pages += 1 - ((entry->bitmap[offs_in_chunk >> 5] >> (offs_in_chunk & 31)) & 1);
|
||||
entry->bitmap[offs_in_chunk >> 5] |= 1 << (offs_in_chunk & 31);
|
||||
}
|
||||
Assert(entry->prewarm_started);
|
||||
entry->prewarm_started = false;
|
||||
}
|
||||
|
||||
LWLockRelease(lfc_lock);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
Assert(!entry || !entry->prewarm_started);
|
||||
LWLockRelease(lfc_lock);
|
||||
}
|
||||
|
||||
if (n_sent == ++n_received)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
free(fs);
|
||||
elog(LOG, "LFC: complete prewarming: loaded %ld pages", (long)n_received);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Check if page is present in the cache.
|
||||
* Returns true if page is found in local cache.
|
||||
@@ -899,6 +1247,17 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
|
||||
if (found)
|
||||
{
|
||||
if (entry->prewarm_started)
|
||||
{
|
||||
/*
|
||||
* Some page of this chunk is currently written by prewarm BGW.
|
||||
* We should give-up not to interfere with it.
|
||||
* But clearing prewarm_requested flag also will not allow prewarm BGW to fix it result.
|
||||
*/
|
||||
entry->prewarm_requested = false;
|
||||
LWLockRelease(lfc_lock);
|
||||
return;
|
||||
}
|
||||
/*
|
||||
* Unlink entry from LRU list to pin it for the duration of IO
|
||||
* operation
|
||||
@@ -959,9 +1318,11 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
}
|
||||
entry->access_count = 1;
|
||||
entry->hash = hash;
|
||||
entry->prewarm_started = false;
|
||||
memset(entry->bitmap, 0, sizeof entry->bitmap);
|
||||
}
|
||||
|
||||
entry->prewarm_requested = false; /* prohibit prewarm if LFC entry is updated by some backend */
|
||||
generation = lfc_ctl->generation;
|
||||
entry_offset = entry->offset;
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
@@ -276,6 +276,8 @@ extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
|
||||
BlockNumber blkno, int nblocks, bits8 *bitmap);
|
||||
extern void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno);
|
||||
extern void lfc_init(void);
|
||||
extern void lfc_save_state(void);
|
||||
extern void lfc_load_pages(void);
|
||||
|
||||
static inline bool
|
||||
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
|
||||
40
test_runner/regress/test_lfc_prewarm.py
Normal file
40
test_runner/regress/test_lfc_prewarm.py
Normal file
@@ -0,0 +1,40 @@
|
||||
import time
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
def test_lfc_prewarm(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
n_records = 1000000
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
config_lines=[
|
||||
"autovacuum = off",
|
||||
"shared_buffers=1MB",
|
||||
"neon.max_file_cache_size=1GB",
|
||||
"neon.file_cache_size_limit=1GB",
|
||||
"neon.file_cache_prewarm_limit=1000",
|
||||
],
|
||||
)
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("create extension neon")
|
||||
cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
|
||||
cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
|
||||
|
||||
endpoint.stop()
|
||||
endpoint.start()
|
||||
|
||||
time.sleep(5) # give prewarm BGW some time to proceed
|
||||
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("select file_cache_used from neon_stat_file_cache")
|
||||
lfc_used = cur.fetchall()[0][0]
|
||||
log.info(f"Used LFC size: {lfc_used}")
|
||||
assert lfc_used > 100
|
||||
|
||||
cur.execute("select sum(pk) from t")
|
||||
assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 2199b83fb7...ecb1020ff7
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 22e580fe9f...a4830163a6
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: e131a9c027...cc36e03bd0
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: 68b5038f27...37d5ead146
8
vendor/revisions.json
vendored
8
vendor/revisions.json
vendored
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.0",
|
||||
"68b5038f27e493bde6ae552fe066f10cbdfe6a14"
|
||||
"37d5ead146b028dd9a5c07e7a37068ec0df9f465"
|
||||
],
|
||||
"v16": [
|
||||
"16.4",
|
||||
"e131a9c027b202ce92bd7b9cf2569d48a6f9948e"
|
||||
"cc36e03bd0c927022cf3b3563e291e42d75366a1"
|
||||
],
|
||||
"v15": [
|
||||
"15.8",
|
||||
"22e580fe9ffcea7e02592110b1c9bf426d83cada"
|
||||
"a4830163a65811578824ce4022c1cd3daef33d4e"
|
||||
],
|
||||
"v14": [
|
||||
"14.13",
|
||||
"2199b83fb72680001ce0f43bf6187a21dfb8f45d"
|
||||
"ecb1020ff71927e9dd59c526254bb8846bb73ee1"
|
||||
]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user